test_support.py revision 6de9e938a5f70128a77c589b3ec40936f6335c1c
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4    raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import functools
9import gc
10import socket
11import sys
12import os
13import platform
14import shutil
15import warnings
16import unittest
17import importlib
18import UserDict
19import re
20
21__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
22           "verbose", "use_resources", "max_memuse", "record_original_stdout",
23           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
24           "is_resource_enabled", "requires", "find_unused_port", "bind_port",
25           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
26           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
27           "open_urlresource", "check_warnings", "check_py3k_warnings",
28           "CleanImport", "EnvironmentVarGuard", "captured_output",
29           "captured_stdout", "TransientResource", "transient_internet",
30           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
31           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
32           "threading_cleanup", "reap_children", "cpython_only",
33           "check_impl_detail", "get_attribute", "py3k_bytes"]
34
35class Error(Exception):
36    """Base class for regression test exceptions."""
37
38class TestFailed(Error):
39    """Test failed."""
40
41class ResourceDenied(unittest.SkipTest):
42    """Test skipped because it requested a disallowed resource.
43
44    This is raised when a test calls requires() for a resource that
45    has not be enabled.  It is used to distinguish between expected
46    and unexpected skips.
47    """
48
49@contextlib.contextmanager
50def _ignore_deprecated_imports(ignore=True):
51    """Context manager to suppress package and module deprecation
52    warnings when importing them.
53
54    If ignore is False, this context manager has no effect."""
55    if ignore:
56        with warnings.catch_warnings():
57            warnings.filterwarnings("ignore", ".+ (module|package)",
58                                    DeprecationWarning)
59            yield
60    else:
61        yield
62
63
64def import_module(name, deprecated=False):
65    """Import and return the module to be tested, raising SkipTest if
66    it is not available.
67
68    If deprecated is True, any module or package deprecation messages
69    will be suppressed."""
70    with _ignore_deprecated_imports(deprecated):
71        try:
72            return importlib.import_module(name)
73        except ImportError, msg:
74            raise unittest.SkipTest(str(msg))
75
76
77def _save_and_remove_module(name, orig_modules):
78    """Helper function to save and remove a module from sys.modules
79
80       Return value is True if the module was in sys.modules and
81       False otherwise."""
82    saved = True
83    try:
84        orig_modules[name] = sys.modules[name]
85    except KeyError:
86        saved = False
87    else:
88        del sys.modules[name]
89    return saved
90
91
92def _save_and_block_module(name, orig_modules):
93    """Helper function to save and block a module in sys.modules
94
95       Return value is True if the module was in sys.modules and
96       False otherwise."""
97    saved = True
98    try:
99        orig_modules[name] = sys.modules[name]
100    except KeyError:
101        saved = False
102    sys.modules[name] = 0
103    return saved
104
105
106def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
107    """Imports and returns a module, deliberately bypassing the sys.modules cache
108    and importing a fresh copy of the module. Once the import is complete,
109    the sys.modules cache is restored to its original state.
110
111    Modules named in fresh are also imported anew if needed by the import.
112
113    Importing of modules named in blocked is prevented while the fresh import
114    takes place.
115
116    If deprecated is True, any module or package deprecation messages
117    will be suppressed."""
118    # NOTE: test_heapq and test_warnings include extra sanity checks to make
119    # sure that this utility function is working as expected
120    with _ignore_deprecated_imports(deprecated):
121        # Keep track of modules saved for later restoration as well
122        # as those which just need a blocking entry removed
123        orig_modules = {}
124        names_to_remove = []
125        _save_and_remove_module(name, orig_modules)
126        try:
127            for fresh_name in fresh:
128                _save_and_remove_module(fresh_name, orig_modules)
129            for blocked_name in blocked:
130                if not _save_and_block_module(blocked_name, orig_modules):
131                    names_to_remove.append(blocked_name)
132            fresh_module = importlib.import_module(name)
133        finally:
134            for orig_name, module in orig_modules.items():
135                sys.modules[orig_name] = module
136            for name_to_remove in names_to_remove:
137                del sys.modules[name_to_remove]
138        return fresh_module
139
140
141def get_attribute(obj, name):
142    """Get an attribute, raising SkipTest if AttributeError is raised."""
143    try:
144        attribute = getattr(obj, name)
145    except AttributeError:
146        raise unittest.SkipTest("module %s has no attribute %s" % (
147            obj.__name__, name))
148    else:
149        return attribute
150
151
152verbose = 1              # Flag set to 0 by regrtest.py
153use_resources = None     # Flag set to [] by regrtest.py
154max_memuse = 0           # Disable bigmem tests (they will still be run with
155                         # small sizes, to make sure they work.)
156real_max_memuse = 0
157
158# _original_stdout is meant to hold stdout at the time regrtest began.
159# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
160# The point is to have some flavor of stdout the user can actually see.
161_original_stdout = None
162def record_original_stdout(stdout):
163    global _original_stdout
164    _original_stdout = stdout
165
166def get_original_stdout():
167    return _original_stdout or sys.stdout
168
169def unload(name):
170    try:
171        del sys.modules[name]
172    except KeyError:
173        pass
174
175def unlink(filename):
176    try:
177        os.unlink(filename)
178    except OSError:
179        pass
180
181def rmtree(path):
182    try:
183        shutil.rmtree(path)
184    except OSError, e:
185        # Unix returns ENOENT, Windows returns ESRCH.
186        if e.errno not in (errno.ENOENT, errno.ESRCH):
187            raise
188
189def forget(modname):
190    '''"Forget" a module was ever imported by removing it from sys.modules and
191    deleting any .pyc and .pyo files.'''
192    unload(modname)
193    for dirname in sys.path:
194        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
195        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
196        # the chance exists that there is no .pyc (and thus the 'try' statement
197        # is exited) but there is a .pyo file.
198        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
199
200def is_resource_enabled(resource):
201    """Test whether a resource is enabled.  Known resources are set by
202    regrtest.py."""
203    return use_resources is not None and resource in use_resources
204
205def requires(resource, msg=None):
206    """Raise ResourceDenied if the specified resource is not available.
207
208    If the caller's module is __main__ then automatically return True.  The
209    possibility of False being returned occurs when regrtest.py is executing."""
210    # see if the caller's module is __main__ - if so, treat as if
211    # the resource was set
212    if sys._getframe(1).f_globals.get("__name__") == "__main__":
213        return
214    if not is_resource_enabled(resource):
215        if msg is None:
216            msg = "Use of the `%s' resource not enabled" % resource
217        raise ResourceDenied(msg)
218
219HOST = 'localhost'
220
221def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
222    """Returns an unused port that should be suitable for binding.  This is
223    achieved by creating a temporary socket with the same family and type as
224    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
225    the specified host address (defaults to 0.0.0.0) with the port set to 0,
226    eliciting an unused ephemeral port from the OS.  The temporary socket is
227    then closed and deleted, and the ephemeral port is returned.
228
229    Either this method or bind_port() should be used for any tests where a
230    server socket needs to be bound to a particular port for the duration of
231    the test.  Which one to use depends on whether the calling code is creating
232    a python socket, or if an unused port needs to be provided in a constructor
233    or passed to an external program (i.e. the -accept argument to openssl's
234    s_server mode).  Always prefer bind_port() over find_unused_port() where
235    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
236    socket is bound to a hard coded port, the ability to run multiple instances
237    of the test simultaneously on the same host is compromised, which makes the
238    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
239    may simply manifest as a failed test, which can be recovered from without
240    intervention in most cases, but on Windows, the entire python process can
241    completely and utterly wedge, requiring someone to log in to the buildbot
242    and manually kill the affected process.
243
244    (This is easy to reproduce on Windows, unfortunately, and can be traced to
245    the SO_REUSEADDR socket option having different semantics on Windows versus
246    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
247    listen and then accept connections on identical host/ports.  An EADDRINUSE
248    socket.error will be raised at some point (depending on the platform and
249    the order bind and listen were called on each socket).
250
251    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
252    will ever be raised when attempting to bind two identical host/ports. When
253    accept() is called on each socket, the second caller's process will steal
254    the port from the first caller, leaving them both in an awkwardly wedged
255    state where they'll no longer respond to any signals or graceful kills, and
256    must be forcibly killed via OpenProcess()/TerminateProcess().
257
258    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
259    instead of SO_REUSEADDR, which effectively affords the same semantics as
260    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
261    Source world compared to Windows ones, this is a common mistake.  A quick
262    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
263    openssl.exe is called with the 's_server' option, for example. See
264    http://bugs.python.org/issue2550 for more info.  The following site also
265    has a very thorough description about the implications of both REUSEADDR
266    and EXCLUSIVEADDRUSE on Windows:
267    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
268
269    XXX: although this approach is a vast improvement on previous attempts to
270    elicit unused ports, it rests heavily on the assumption that the ephemeral
271    port returned to us by the OS won't immediately be dished back out to some
272    other process when we close and delete our temporary socket but before our
273    calling code has a chance to bind the returned port.  We can deal with this
274    issue if/when we come across it."""
275    tempsock = socket.socket(family, socktype)
276    port = bind_port(tempsock)
277    tempsock.close()
278    del tempsock
279    return port
280
281def bind_port(sock, host=HOST):
282    """Bind the socket to a free port and return the port number.  Relies on
283    ephemeral ports in order to ensure we are using an unbound port.  This is
284    important as many tests may be running simultaneously, especially in a
285    buildbot environment.  This method raises an exception if the sock.family
286    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
287    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
288    for TCP/IP sockets.  The only case for setting these options is testing
289    multicasting via multiple UDP sockets.
290
291    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
292    on Windows), it will be set on the socket.  This will prevent anyone else
293    from bind()'ing to our host/port for the duration of the test.
294    """
295    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
296        if hasattr(socket, 'SO_REUSEADDR'):
297            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
298                raise TestFailed("tests should never set the SO_REUSEADDR "   \
299                                 "socket option on TCP/IP sockets!")
300        if hasattr(socket, 'SO_REUSEPORT'):
301            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
302                raise TestFailed("tests should never set the SO_REUSEPORT "   \
303                                 "socket option on TCP/IP sockets!")
304        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
305            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
306
307    sock.bind((host, 0))
308    port = sock.getsockname()[1]
309    return port
310
311FUZZ = 1e-6
312
313def fcmp(x, y): # fuzzy comparison function
314    if isinstance(x, float) or isinstance(y, float):
315        try:
316            fuzz = (abs(x) + abs(y)) * FUZZ
317            if abs(x-y) <= fuzz:
318                return 0
319        except:
320            pass
321    elif type(x) == type(y) and isinstance(x, (tuple, list)):
322        for i in range(min(len(x), len(y))):
323            outcome = fcmp(x[i], y[i])
324            if outcome != 0:
325                return outcome
326        return (len(x) > len(y)) - (len(x) < len(y))
327    return (x > y) - (x < y)
328
329try:
330    unicode
331    have_unicode = True
332except NameError:
333    have_unicode = False
334
335is_jython = sys.platform.startswith('java')
336
337# Filename used for testing
338if os.name == 'java':
339    # Jython disallows @ in module names
340    TESTFN = '$test'
341elif os.name == 'riscos':
342    TESTFN = 'testfile'
343else:
344    TESTFN = '@test'
345    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
346    if have_unicode:
347        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
348        # TESTFN_UNICODE is a filename that can be encoded using the
349        # file system encoding, but *not* with the default (ascii) encoding
350        if isinstance('', unicode):
351            # python -U
352            # XXX perhaps unicode() should accept Unicode strings?
353            TESTFN_UNICODE = "@test-\xe0\xf2"
354        else:
355            # 2 latin characters.
356            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
357        TESTFN_ENCODING = sys.getfilesystemencoding()
358        # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
359        # able to be encoded by *either* the default or filesystem encoding.
360        # This test really only makes sense on Windows NT platforms
361        # which have special Unicode support in posixmodule.
362        if (not hasattr(sys, "getwindowsversion") or
363                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
364            TESTFN_UNICODE_UNENCODEABLE = None
365        else:
366            # Japanese characters (I think - from bug 846133)
367            TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
368            try:
369                # XXX - Note - should be using TESTFN_ENCODING here - but for
370                # Windows, "mbcs" currently always operates as if in
371                # errors=ignore' mode - hence we get '?' characters rather than
372                # the exception.  'Latin1' operates as we expect - ie, fails.
373                # See [ 850997 ] mbcs encoding ignores errors
374                TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
375            except UnicodeEncodeError:
376                pass
377            else:
378                print \
379                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
380                'Unicode filename tests may not be effective' \
381                % TESTFN_UNICODE_UNENCODEABLE
382
383
384# Disambiguate TESTFN for parallel testing, while letting it remain a valid
385# module name.
386TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
387
388# Save the initial cwd
389SAVEDCWD = os.getcwd()
390
391@contextlib.contextmanager
392def temp_cwd(name='tempcwd', quiet=False):
393    """
394    Context manager that creates a temporary directory and set it as CWD.
395
396    The new CWD is created in the current directory and it's named *name*.
397    If *quiet* is False (default) and it's not possible to create or change
398    the CWD, an error is raised.  If it's True, only a warning is raised
399    and the original CWD is used.
400    """
401    if isinstance(name, unicode):
402        try:
403            name = name.encode(sys.getfilesystemencoding() or 'ascii')
404        except UnicodeEncodeError:
405            if not quiet:
406                raise unittest.SkipTest('unable to encode the cwd name with '
407                                        'the filesystem encoding.')
408    saved_dir = os.getcwd()
409    is_temporary = False
410    try:
411        os.mkdir(name)
412        os.chdir(name)
413        is_temporary = True
414    except OSError:
415        if not quiet:
416            raise
417        warnings.warn('tests may fail, unable to change the CWD to ' + name,
418                      RuntimeWarning, stacklevel=3)
419    try:
420        yield os.getcwd()
421    finally:
422        os.chdir(saved_dir)
423        if is_temporary:
424            rmtree(name)
425
426
427def findfile(file, here=__file__):
428    """Try to find a file on sys.path and the working directory.  If it is not
429    found the argument passed to the function is returned (this does not
430    necessarily signal failure; could still be the legitimate path)."""
431    if os.path.isabs(file):
432        return file
433    path = sys.path
434    path = [os.path.dirname(here)] + path
435    for dn in path:
436        fn = os.path.join(dn, file)
437        if os.path.exists(fn): return fn
438    return file
439
440def sortdict(dict):
441    "Like repr(dict), but in sorted order."
442    items = dict.items()
443    items.sort()
444    reprpairs = ["%r: %r" % pair for pair in items]
445    withcommas = ", ".join(reprpairs)
446    return "{%s}" % withcommas
447
448def make_bad_fd():
449    """
450    Create an invalid file descriptor by opening and closing a file and return
451    its fd.
452    """
453    file = open(TESTFN, "wb")
454    try:
455        return file.fileno()
456    finally:
457        file.close()
458        unlink(TESTFN)
459
460def check_syntax_error(testcase, statement):
461    testcase.assertRaises(SyntaxError, compile, statement,
462                          '<test string>', 'exec')
463
464def open_urlresource(url):
465    import urlparse, urllib2
466
467    requires('urlfetch')
468    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
469
470    fn = os.path.join(os.path.dirname(__file__), "data", filename)
471    if os.path.exists(fn):
472        return open(fn)
473
474    print >> get_original_stdout(), '\tfetching %s ...' % url
475    f = urllib2.urlopen(url, timeout=15)
476    try:
477        with open(fn, "wb") as out:
478            s = f.read()
479            while s:
480                out.write(s)
481                s = f.read()
482    finally:
483        f.close()
484    return open(fn)
485
486
487class WarningsRecorder(object):
488    """Convenience wrapper for the warnings list returned on
489       entry to the warnings.catch_warnings() context manager.
490    """
491    def __init__(self, warnings_list):
492        self._warnings = warnings_list
493        self._last = 0
494
495    def __getattr__(self, attr):
496        if len(self._warnings) > self._last:
497            return getattr(self._warnings[-1], attr)
498        elif attr in warnings.WarningMessage._WARNING_DETAILS:
499            return None
500        raise AttributeError("%r has no attribute %r" % (self, attr))
501
502    @property
503    def warnings(self):
504        return self._warnings[self._last:]
505
506    def reset(self):
507        self._last = len(self._warnings)
508
509
510def _filterwarnings(filters, quiet=False):
511    """Catch the warnings, then check if all the expected
512    warnings have been raised and re-raise unexpected warnings.
513    If 'quiet' is True, only re-raise the unexpected warnings.
514    """
515    # Clear the warning registry of the calling module
516    # in order to re-raise the warnings.
517    frame = sys._getframe(2)
518    registry = frame.f_globals.get('__warningregistry__')
519    if registry:
520        registry.clear()
521    with warnings.catch_warnings(record=True) as w:
522        # Disable filters, to record all warnings.  Because
523        # test_warnings swap the module, we need to look up
524        # in the sys.modules dictionary.
525        sys.modules['warnings'].resetwarnings()
526        yield WarningsRecorder(w)
527    # Filter the recorded warnings
528    reraise = [warning.message for warning in w]
529    missing = []
530    for msg, cat in filters:
531        seen = False
532        for exc in reraise[:]:
533            message = str(exc)
534            # Filter out the matching messages
535            if (re.match(msg, message, re.I) and
536                issubclass(exc.__class__, cat)):
537                seen = True
538                reraise.remove(exc)
539        if not seen and not quiet:
540            # This filter caught nothing
541            missing.append((msg, cat.__name__))
542    for exc in reraise:
543        raise AssertionError("unhandled warning %r" % exc)
544    for filter in missing:
545        raise AssertionError("filter (%r, %s) did not caught any warning" %
546                             filter)
547
548
549@contextlib.contextmanager
550def check_warnings(*filters, **kwargs):
551    """Context manager to silence warnings.
552
553    Accept 2-tuples as positional arguments:
554        ("message regexp", WarningCategory)
555
556    Optional argument:
557     - if 'quiet' is True, it does not fail if a filter catches nothing
558        (default False)
559
560    Without argument, it defaults to:
561        check_warnings(("", Warning), quiet=False)
562    """
563    if not filters:
564        filters = (("", Warning),)
565    return _filterwarnings(filters, kwargs.get('quiet'))
566
567
568@contextlib.contextmanager
569def check_py3k_warnings(*filters, **kwargs):
570    """Context manager to silence py3k warnings.
571
572    Accept 2-tuples as positional arguments:
573        ("message regexp", WarningCategory)
574
575    Optional argument:
576     - if 'quiet' is True, it does not fail if a filter catches nothing
577        (default False)
578
579    Without argument, it defaults to:
580        check_py3k_warnings(("", DeprecationWarning), quiet=False)
581    """
582    if sys.py3kwarning:
583        if not filters:
584            filters = (("", DeprecationWarning),)
585    else:
586        # It should not raise any py3k warning
587        filters = ()
588    return _filterwarnings(filters, kwargs.get('quiet'))
589
590
591class CleanImport(object):
592    """Context manager to force import to return a new module reference.
593
594    This is useful for testing module-level behaviours, such as
595    the emission of a DeprecationWarning on import.
596
597    Use like this:
598
599        with CleanImport("foo"):
600            importlib.import_module("foo") # new reference
601    """
602
603    def __init__(self, *module_names):
604        self.original_modules = sys.modules.copy()
605        for module_name in module_names:
606            if module_name in sys.modules:
607                module = sys.modules[module_name]
608                # It is possible that module_name is just an alias for
609                # another module (e.g. stub for modules renamed in 3.x).
610                # In that case, we also need delete the real module to clear
611                # the import cache.
612                if module.__name__ != module_name:
613                    del sys.modules[module.__name__]
614                del sys.modules[module_name]
615
616    def __enter__(self):
617        return self
618
619    def __exit__(self, *ignore_exc):
620        sys.modules.update(self.original_modules)
621
622
623class EnvironmentVarGuard(UserDict.DictMixin):
624
625    """Class to help protect the environment variable properly.  Can be used as
626    a context manager."""
627
628    def __init__(self):
629        self._environ = os.environ
630        self._changed = {}
631
632    def __getitem__(self, envvar):
633        return self._environ[envvar]
634
635    def __setitem__(self, envvar, value):
636        # Remember the initial value on the first access
637        if envvar not in self._changed:
638            self._changed[envvar] = self._environ.get(envvar)
639        self._environ[envvar] = value
640
641    def __delitem__(self, envvar):
642        # Remember the initial value on the first access
643        if envvar not in self._changed:
644            self._changed[envvar] = self._environ.get(envvar)
645        if envvar in self._environ:
646            del self._environ[envvar]
647
648    def keys(self):
649        return self._environ.keys()
650
651    def set(self, envvar, value):
652        self[envvar] = value
653
654    def unset(self, envvar):
655        del self[envvar]
656
657    def __enter__(self):
658        return self
659
660    def __exit__(self, *ignore_exc):
661        for (k, v) in self._changed.items():
662            if v is None:
663                if k in self._environ:
664                    del self._environ[k]
665            else:
666                self._environ[k] = v
667        os.environ = self._environ
668
669
670class DirsOnSysPath(object):
671    """Context manager to temporarily add directories to sys.path.
672
673    This makes a copy of sys.path, appends any directories given
674    as positional arguments, then reverts sys.path to the copied
675    settings when the context ends.
676
677    Note that *all* sys.path modifications in the body of the
678    context manager, including replacement of the object,
679    will be reverted at the end of the block.
680    """
681
682    def __init__(self, *paths):
683        self.original_value = sys.path[:]
684        self.original_object = sys.path
685        sys.path.extend(paths)
686
687    def __enter__(self):
688        return self
689
690    def __exit__(self, *ignore_exc):
691        sys.path = self.original_object
692        sys.path[:] = self.original_value
693
694
695class TransientResource(object):
696
697    """Raise ResourceDenied if an exception is raised while the context manager
698    is in effect that matches the specified exception and attributes."""
699
700    def __init__(self, exc, **kwargs):
701        self.exc = exc
702        self.attrs = kwargs
703
704    def __enter__(self):
705        return self
706
707    def __exit__(self, type_=None, value=None, traceback=None):
708        """If type_ is a subclass of self.exc and value has attributes matching
709        self.attrs, raise ResourceDenied.  Otherwise let the exception
710        propagate (if any)."""
711        if type_ is not None and issubclass(self.exc, type_):
712            for attr, attr_value in self.attrs.iteritems():
713                if not hasattr(value, attr):
714                    break
715                if getattr(value, attr) != attr_value:
716                    break
717            else:
718                raise ResourceDenied("an optional resource is not available")
719
720
721@contextlib.contextmanager
722def transient_internet():
723    """Return a context manager that raises ResourceDenied when various issues
724    with the Internet connection manifest themselves as exceptions."""
725    time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
726    socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
727    ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
728    with time_out, socket_peer_reset, ioerror_peer_reset:
729        yield
730
731
732@contextlib.contextmanager
733def captured_output(stream_name):
734    """Run the 'with' statement body using a StringIO object in place of a
735    specific attribute on the sys module.
736    Example use (with 'stream_name=stdout')::
737
738       with captured_stdout() as s:
739           print "hello"
740       assert s.getvalue() == "hello"
741    """
742    import StringIO
743    orig_stdout = getattr(sys, stream_name)
744    setattr(sys, stream_name, StringIO.StringIO())
745    try:
746        yield getattr(sys, stream_name)
747    finally:
748        setattr(sys, stream_name, orig_stdout)
749
750def captured_stdout():
751    return captured_output("stdout")
752
753def captured_stdin():
754    return captured_output("stdin")
755
756def gc_collect():
757    """Force as many objects as possible to be collected.
758
759    In non-CPython implementations of Python, this is needed because timely
760    deallocation is not guaranteed by the garbage collector.  (Even in CPython
761    this can be the case in case of reference cycles.)  This means that __del__
762    methods may be called later than expected and weakrefs may remain alive for
763    longer than expected.  This function tries its best to force all garbage
764    objects to disappear.
765    """
766    gc.collect()
767    gc.collect()
768    gc.collect()
769
770
771#=======================================================================
772# Decorator for running a function in a different locale, correctly resetting
773# it afterwards.
774
775def run_with_locale(catstr, *locales):
776    def decorator(func):
777        def inner(*args, **kwds):
778            try:
779                import locale
780                category = getattr(locale, catstr)
781                orig_locale = locale.setlocale(category)
782            except AttributeError:
783                # if the test author gives us an invalid category string
784                raise
785            except:
786                # cannot retrieve original locale, so do nothing
787                locale = orig_locale = None
788            else:
789                for loc in locales:
790                    try:
791                        locale.setlocale(category, loc)
792                        break
793                    except:
794                        pass
795
796            # now run the function, resetting the locale on exceptions
797            try:
798                return func(*args, **kwds)
799            finally:
800                if locale and orig_locale:
801                    locale.setlocale(category, orig_locale)
802        inner.func_name = func.func_name
803        inner.__doc__ = func.__doc__
804        return inner
805    return decorator
806
807#=======================================================================
808# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
809
810# Some handy shorthands. Note that these are used for byte-limits as well
811# as size-limits, in the various bigmem tests
812_1M = 1024*1024
813_1G = 1024 * _1M
814_2G = 2 * _1G
815_4G = 4 * _1G
816
817MAX_Py_ssize_t = sys.maxsize
818
819def set_memlimit(limit):
820    global max_memuse
821    global real_max_memuse
822    sizes = {
823        'k': 1024,
824        'm': _1M,
825        'g': _1G,
826        't': 1024*_1G,
827    }
828    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
829                 re.IGNORECASE | re.VERBOSE)
830    if m is None:
831        raise ValueError('Invalid memory limit %r' % (limit,))
832    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
833    real_max_memuse = memlimit
834    if memlimit > MAX_Py_ssize_t:
835        memlimit = MAX_Py_ssize_t
836    if memlimit < _2G - 1:
837        raise ValueError('Memory limit %r too low to be useful' % (limit,))
838    max_memuse = memlimit
839
840def bigmemtest(minsize, memuse, overhead=5*_1M):
841    """Decorator for bigmem tests.
842
843    'minsize' is the minimum useful size for the test (in arbitrary,
844    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
845    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
846    independent of the testsize, and defaults to 5Mb.
847
848    The decorator tries to guess a good value for 'size' and passes it to
849    the decorated test function. If minsize * memuse is more than the
850    allowed memory use (as defined by max_memuse), the test is skipped.
851    Otherwise, minsize is adjusted upward to use up to max_memuse.
852    """
853    def decorator(f):
854        def wrapper(self):
855            if not max_memuse:
856                # If max_memuse is 0 (the default),
857                # we still want to run the tests with size set to a few kb,
858                # to make sure they work. We still want to avoid using
859                # too much memory, though, but we do that noisily.
860                maxsize = 5147
861                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
862            else:
863                maxsize = int((max_memuse - overhead) / memuse)
864                if maxsize < minsize:
865                    # Really ought to print 'test skipped' or something
866                    if verbose:
867                        sys.stderr.write("Skipping %s because of memory "
868                                         "constraint\n" % (f.__name__,))
869                    return
870                # Try to keep some breathing room in memory use
871                maxsize = max(maxsize - 50 * _1M, minsize)
872            return f(self, maxsize)
873        wrapper.minsize = minsize
874        wrapper.memuse = memuse
875        wrapper.overhead = overhead
876        return wrapper
877    return decorator
878
879def precisionbigmemtest(size, memuse, overhead=5*_1M):
880    def decorator(f):
881        def wrapper(self):
882            if not real_max_memuse:
883                maxsize = 5147
884            else:
885                maxsize = size
886
887                if real_max_memuse and real_max_memuse < maxsize * memuse:
888                    if verbose:
889                        sys.stderr.write("Skipping %s because of memory "
890                                         "constraint\n" % (f.__name__,))
891                    return
892
893            return f(self, maxsize)
894        wrapper.size = size
895        wrapper.memuse = memuse
896        wrapper.overhead = overhead
897        return wrapper
898    return decorator
899
900def bigaddrspacetest(f):
901    """Decorator for tests that fill the address space."""
902    def wrapper(self):
903        if max_memuse < MAX_Py_ssize_t:
904            if verbose:
905                sys.stderr.write("Skipping %s because of memory "
906                                 "constraint\n" % (f.__name__,))
907        else:
908            return f(self)
909    return wrapper
910
911#=======================================================================
912# unittest integration.
913
914class BasicTestRunner:
915    def run(self, test):
916        result = unittest.TestResult()
917        test(result)
918        return result
919
920def _id(obj):
921    return obj
922
923def requires_resource(resource):
924    if resource_is_enabled(resource):
925        return _id
926    else:
927        return unittest.skip("resource {0!r} is not enabled".format(resource))
928
929def cpython_only(test):
930    """
931    Decorator for tests only applicable on CPython.
932    """
933    return impl_detail(cpython=True)(test)
934
935def impl_detail(msg=None, **guards):
936    if check_impl_detail(**guards):
937        return _id
938    if msg is None:
939        guardnames, default = _parse_guards(guards)
940        if default:
941            msg = "implementation detail not available on {0}"
942        else:
943            msg = "implementation detail specific to {0}"
944        guardnames = sorted(guardnames.keys())
945        msg = msg.format(' or '.join(guardnames))
946    return unittest.skip(msg)
947
948def _parse_guards(guards):
949    # Returns a tuple ({platform_name: run_me}, default_value)
950    if not guards:
951        return ({'cpython': True}, False)
952    is_true = guards.values()[0]
953    assert guards.values() == [is_true] * len(guards)   # all True or all False
954    return (guards, not is_true)
955
956# Use the following check to guard CPython's implementation-specific tests --
957# or to run them only on the implementation(s) guarded by the arguments.
958def check_impl_detail(**guards):
959    """This function returns True or False depending on the host platform.
960       Examples:
961          if check_impl_detail():               # only on CPython (default)
962          if check_impl_detail(jython=True):    # only on Jython
963          if check_impl_detail(cpython=False):  # everywhere except on CPython
964    """
965    guards, default = _parse_guards(guards)
966    return guards.get(platform.python_implementation().lower(), default)
967
968
969
970def _run_suite(suite):
971    """Run tests from a unittest.TestSuite-derived class."""
972    if verbose:
973        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
974    else:
975        runner = BasicTestRunner()
976
977    result = runner.run(suite)
978    if not result.wasSuccessful():
979        if len(result.errors) == 1 and not result.failures:
980            err = result.errors[0][1]
981        elif len(result.failures) == 1 and not result.errors:
982            err = result.failures[0][1]
983        else:
984            err = "multiple errors occurred"
985            if not verbose:
986                err += "; run in verbose mode for details"
987        raise TestFailed(err)
988
989
990def run_unittest(*classes):
991    """Run tests from unittest.TestCase-derived classes."""
992    valid_types = (unittest.TestSuite, unittest.TestCase)
993    suite = unittest.TestSuite()
994    for cls in classes:
995        if isinstance(cls, str):
996            if cls in sys.modules:
997                suite.addTest(unittest.findTestCases(sys.modules[cls]))
998            else:
999                raise ValueError("str arguments must be keys in sys.modules")
1000        elif isinstance(cls, valid_types):
1001            suite.addTest(cls)
1002        else:
1003            suite.addTest(unittest.makeSuite(cls))
1004    _run_suite(suite)
1005
1006
1007#=======================================================================
1008# doctest driver.
1009
1010def run_doctest(module, verbosity=None):
1011    """Run doctest on the given module.  Return (#failures, #tests).
1012
1013    If optional argument verbosity is not specified (or is None), pass
1014    test_support's belief about verbosity on to doctest.  Else doctest's
1015    usual behavior is used (it searches sys.argv for -v).
1016    """
1017
1018    import doctest
1019
1020    if verbosity is None:
1021        verbosity = verbose
1022    else:
1023        verbosity = None
1024
1025    # Direct doctest output (normally just errors) to real stdout; doctest
1026    # output shouldn't be compared by regrtest.
1027    save_stdout = sys.stdout
1028    sys.stdout = get_original_stdout()
1029    try:
1030        f, t = doctest.testmod(module, verbose=verbosity)
1031        if f:
1032            raise TestFailed("%d of %d doctests failed" % (f, t))
1033    finally:
1034        sys.stdout = save_stdout
1035    if verbose:
1036        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
1037    return f, t
1038
1039#=======================================================================
1040# Threading support to prevent reporting refleaks when running regrtest.py -R
1041
1042# NOTE: we use thread._count() rather than threading.enumerate() (or the
1043# moral equivalent thereof) because a threading.Thread object is still alive
1044# until its __bootstrap() method has returned, even after it has been
1045# unregistered from the threading module.
1046# thread._count(), on the other hand, only gets decremented *after* the
1047# __bootstrap() method has returned, which gives us reliable reference counts
1048# at the end of a test run.
1049
1050def threading_setup():
1051    import thread
1052    return thread._count(),
1053
1054def threading_cleanup(nb_threads):
1055    import thread
1056    import time
1057
1058    _MAX_COUNT = 10
1059    for count in range(_MAX_COUNT):
1060        n = thread._count()
1061        if n == nb_threads:
1062            break
1063        time.sleep(0.1)
1064    # XXX print a warning in case of failure?
1065
1066def reap_threads(func):
1067    @functools.wraps(func)
1068    def decorator(*args):
1069        key = threading_setup()
1070        try:
1071            return func(*args)
1072        finally:
1073            threading_cleanup(*key)
1074    return decorator
1075
1076def reap_children():
1077    """Use this function at the end of test_main() whenever sub-processes
1078    are started.  This will help ensure that no extra children (zombies)
1079    stick around to hog resources and create problems when looking
1080    for refleaks.
1081    """
1082
1083    # Reap all our dead child processes so we don't leave zombies around.
1084    # These hog resources and might be causing some of the buildbots to die.
1085    if hasattr(os, 'waitpid'):
1086        any_process = -1
1087        while True:
1088            try:
1089                # This will raise an exception on Windows.  That's ok.
1090                pid, status = os.waitpid(any_process, os.WNOHANG)
1091                if pid == 0:
1092                    break
1093            except:
1094                break
1095
1096def py3k_bytes(b):
1097    """Emulate the py3k bytes() constructor.
1098
1099    NOTE: This is only a best effort function.
1100    """
1101    try:
1102        # memoryview?
1103        return b.tobytes()
1104    except AttributeError:
1105        try:
1106            # iterable of ints?
1107            return b"".join(chr(x) for x in b)
1108        except TypeError:
1109            return bytes(b)
1110