test_support.py revision e7d8be80ba634fa15ece6f503c33592e0d333361
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4    raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import socket
9import sys
10import os
11import shutil
12import warnings
13import unittest
14
15__all__ = ["Error", "TestFailed", "TestSkipped", "ResourceDenied", "import_module",
16           "verbose", "use_resources", "max_memuse", "record_original_stdout",
17           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
18           "is_resource_enabled", "requires", "find_unused_port", "bind_port",
19           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
20           "findfile", "verify", "vereq", "sortdict", "check_syntax_error",
21           "open_urlresource", "WarningMessage", "catch_warning", "CleanImport",
22           "EnvironmentVarGuard", "captured_output",
23           "captured_stdout", "TransientResource", "transient_internet",
24           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
25           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
26           "threading_cleanup", "reap_children"]
27
28class Error(Exception):
29    """Base class for regression test exceptions."""
30
31class TestFailed(Error):
32    """Test failed."""
33
34class TestSkipped(Error):
35    """Test skipped.
36
37    This can be raised to indicate that a test was deliberatly
38    skipped, but not because a feature wasn't available.  For
39    example, if some resource can't be used, such as the network
40    appears to be unavailable, this should be raised instead of
41    TestFailed.
42    """
43
44class ResourceDenied(TestSkipped):
45    """Test skipped because it requested a disallowed resource.
46
47    This is raised when a test calls requires() for a resource that
48    has not be enabled.  It is used to distinguish between expected
49    and unexpected skips.
50    """
51
52def import_module(name, deprecated=False):
53    """Import the module to be tested, raising TestSkipped if it is not
54    available."""
55    with catch_warning(record=False):
56        if deprecated:
57            warnings.filterwarnings("ignore", ".+ (module|package)",
58                                    DeprecationWarning)
59        try:
60            module = __import__(name, level=0)
61        except ImportError:
62            raise TestSkipped("No module named " + name)
63        else:
64            return module
65
66verbose = 1              # Flag set to 0 by regrtest.py
67use_resources = None     # Flag set to [] by regrtest.py
68max_memuse = 0           # Disable bigmem tests (they will still be run with
69                         # small sizes, to make sure they work.)
70real_max_memuse = 0
71
72# _original_stdout is meant to hold stdout at the time regrtest began.
73# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
74# The point is to have some flavor of stdout the user can actually see.
75_original_stdout = None
76def record_original_stdout(stdout):
77    global _original_stdout
78    _original_stdout = stdout
79
80def get_original_stdout():
81    return _original_stdout or sys.stdout
82
83def unload(name):
84    try:
85        del sys.modules[name]
86    except KeyError:
87        pass
88
89def unlink(filename):
90    try:
91        os.unlink(filename)
92    except OSError:
93        pass
94
95def rmtree(path):
96    try:
97        shutil.rmtree(path)
98    except OSError, e:
99        # Unix returns ENOENT, Windows returns ESRCH.
100        if e.errno not in (errno.ENOENT, errno.ESRCH):
101            raise
102
103def forget(modname):
104    '''"Forget" a module was ever imported by removing it from sys.modules and
105    deleting any .pyc and .pyo files.'''
106    unload(modname)
107    for dirname in sys.path:
108        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
109        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
110        # the chance exists that there is no .pyc (and thus the 'try' statement
111        # is exited) but there is a .pyo file.
112        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
113
114def is_resource_enabled(resource):
115    """Test whether a resource is enabled.  Known resources are set by
116    regrtest.py."""
117    return use_resources is not None and resource in use_resources
118
119def requires(resource, msg=None):
120    """Raise ResourceDenied if the specified resource is not available.
121
122    If the caller's module is __main__ then automatically return True.  The
123    possibility of False being returned occurs when regrtest.py is executing."""
124    # see if the caller's module is __main__ - if so, treat as if
125    # the resource was set
126    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
127        return
128    if not is_resource_enabled(resource):
129        if msg is None:
130            msg = "Use of the `%s' resource not enabled" % resource
131        raise ResourceDenied(msg)
132
133HOST = 'localhost'
134
135def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
136    """Returns an unused port that should be suitable for binding.  This is
137    achieved by creating a temporary socket with the same family and type as
138    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
139    the specified host address (defaults to 0.0.0.0) with the port set to 0,
140    eliciting an unused ephemeral port from the OS.  The temporary socket is
141    then closed and deleted, and the ephemeral port is returned.
142
143    Either this method or bind_port() should be used for any tests where a
144    server socket needs to be bound to a particular port for the duration of
145    the test.  Which one to use depends on whether the calling code is creating
146    a python socket, or if an unused port needs to be provided in a constructor
147    or passed to an external program (i.e. the -accept argument to openssl's
148    s_server mode).  Always prefer bind_port() over find_unused_port() where
149    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
150    socket is bound to a hard coded port, the ability to run multiple instances
151    of the test simultaneously on the same host is compromised, which makes the
152    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
153    may simply manifest as a failed test, which can be recovered from without
154    intervention in most cases, but on Windows, the entire python process can
155    completely and utterly wedge, requiring someone to log in to the buildbot
156    and manually kill the affected process.
157
158    (This is easy to reproduce on Windows, unfortunately, and can be traced to
159    the SO_REUSEADDR socket option having different semantics on Windows versus
160    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
161    listen and then accept connections on identical host/ports.  An EADDRINUSE
162    socket.error will be raised at some point (depending on the platform and
163    the order bind and listen were called on each socket).
164
165    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
166    will ever be raised when attempting to bind two identical host/ports. When
167    accept() is called on each socket, the second caller's process will steal
168    the port from the first caller, leaving them both in an awkwardly wedged
169    state where they'll no longer respond to any signals or graceful kills, and
170    must be forcibly killed via OpenProcess()/TerminateProcess().
171
172    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
173    instead of SO_REUSEADDR, which effectively affords the same semantics as
174    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
175    Source world compared to Windows ones, this is a common mistake.  A quick
176    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
177    openssl.exe is called with the 's_server' option, for example. See
178    http://bugs.python.org/issue2550 for more info.  The following site also
179    has a very thorough description about the implications of both REUSEADDR
180    and EXCLUSIVEADDRUSE on Windows:
181    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
182
183    XXX: although this approach is a vast improvement on previous attempts to
184    elicit unused ports, it rests heavily on the assumption that the ephemeral
185    port returned to us by the OS won't immediately be dished back out to some
186    other process when we close and delete our temporary socket but before our
187    calling code has a chance to bind the returned port.  We can deal with this
188    issue if/when we come across it."""
189    tempsock = socket.socket(family, socktype)
190    port = bind_port(tempsock)
191    tempsock.close()
192    del tempsock
193    return port
194
195def bind_port(sock, host=HOST):
196    """Bind the socket to a free port and return the port number.  Relies on
197    ephemeral ports in order to ensure we are using an unbound port.  This is
198    important as many tests may be running simultaneously, especially in a
199    buildbot environment.  This method raises an exception if the sock.family
200    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
201    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
202    for TCP/IP sockets.  The only case for setting these options is testing
203    multicasting via multiple UDP sockets.
204
205    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
206    on Windows), it will be set on the socket.  This will prevent anyone else
207    from bind()'ing to our host/port for the duration of the test.
208    """
209    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
210        if hasattr(socket, 'SO_REUSEADDR'):
211            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
212                raise TestFailed("tests should never set the SO_REUSEADDR "   \
213                                 "socket option on TCP/IP sockets!")
214        if hasattr(socket, 'SO_REUSEPORT'):
215            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
216                raise TestFailed("tests should never set the SO_REUSEPORT "   \
217                                 "socket option on TCP/IP sockets!")
218        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
219            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
220
221    sock.bind((host, 0))
222    port = sock.getsockname()[1]
223    return port
224
225FUZZ = 1e-6
226
227def fcmp(x, y): # fuzzy comparison function
228    if isinstance(x, float) or isinstance(y, float):
229        try:
230            fuzz = (abs(x) + abs(y)) * FUZZ
231            if abs(x-y) <= fuzz:
232                return 0
233        except:
234            pass
235    elif type(x) == type(y) and isinstance(x, (tuple, list)):
236        for i in range(min(len(x), len(y))):
237            outcome = fcmp(x[i], y[i])
238            if outcome != 0:
239                return outcome
240        return (len(x) > len(y)) - (len(x) < len(y))
241    return (x > y) - (x < y)
242
243try:
244    unicode
245    have_unicode = True
246except NameError:
247    have_unicode = False
248
249is_jython = sys.platform.startswith('java')
250
251# Filename used for testing
252if os.name == 'java':
253    # Jython disallows @ in module names
254    TESTFN = '$test'
255elif os.name == 'riscos':
256    TESTFN = 'testfile'
257else:
258    TESTFN = '@test'
259    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
260    if have_unicode:
261        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
262        # TESTFN_UNICODE is a filename that can be encoded using the
263        # file system encoding, but *not* with the default (ascii) encoding
264        if isinstance('', unicode):
265            # python -U
266            # XXX perhaps unicode() should accept Unicode strings?
267            TESTFN_UNICODE = "@test-\xe0\xf2"
268        else:
269            # 2 latin characters.
270            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
271        TESTFN_ENCODING = sys.getfilesystemencoding()
272        # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
273        # able to be encoded by *either* the default or filesystem encoding.
274        # This test really only makes sense on Windows NT platforms
275        # which have special Unicode support in posixmodule.
276        if (not hasattr(sys, "getwindowsversion") or
277                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
278            TESTFN_UNICODE_UNENCODEABLE = None
279        else:
280            # Japanese characters (I think - from bug 846133)
281            TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
282            try:
283                # XXX - Note - should be using TESTFN_ENCODING here - but for
284                # Windows, "mbcs" currently always operates as if in
285                # errors=ignore' mode - hence we get '?' characters rather than
286                # the exception.  'Latin1' operates as we expect - ie, fails.
287                # See [ 850997 ] mbcs encoding ignores errors
288                TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
289            except UnicodeEncodeError:
290                pass
291            else:
292                print \
293                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
294                'Unicode filename tests may not be effective' \
295                % TESTFN_UNICODE_UNENCODEABLE
296
297# Make sure we can write to TESTFN, try in /tmp if we can't
298fp = None
299try:
300    fp = open(TESTFN, 'w+')
301except IOError:
302    TMP_TESTFN = os.path.join('/tmp', TESTFN)
303    try:
304        fp = open(TMP_TESTFN, 'w+')
305        TESTFN = TMP_TESTFN
306        del TMP_TESTFN
307    except IOError:
308        print ('WARNING: tests will fail, unable to write to: %s or %s' %
309                (TESTFN, TMP_TESTFN))
310if fp is not None:
311    fp.close()
312    unlink(TESTFN)
313del fp
314
315def findfile(file, here=__file__):
316    """Try to find a file on sys.path and the working directory.  If it is not
317    found the argument passed to the function is returned (this does not
318    necessarily signal failure; could still be the legitimate path)."""
319    if os.path.isabs(file):
320        return file
321    path = sys.path
322    path = [os.path.dirname(here)] + path
323    for dn in path:
324        fn = os.path.join(dn, file)
325        if os.path.exists(fn): return fn
326    return file
327
328def verify(condition, reason='test failed'):
329    """Verify that condition is true. If not, raise TestFailed.
330
331       The optional argument reason can be given to provide
332       a better error text.
333    """
334
335    if not condition:
336        raise TestFailed(reason)
337
338def vereq(a, b):
339    """Raise TestFailed if a == b is false.
340
341    This is better than verify(a == b) because, in case of failure, the
342    error message incorporates repr(a) and repr(b) so you can see the
343    inputs.
344
345    Note that "not (a == b)" isn't necessarily the same as "a != b"; the
346    former is tested.
347    """
348
349    if not (a == b):
350        raise TestFailed("%r == %r" % (a, b))
351
352def sortdict(dict):
353    "Like repr(dict), but in sorted order."
354    items = dict.items()
355    items.sort()
356    reprpairs = ["%r: %r" % pair for pair in items]
357    withcommas = ", ".join(reprpairs)
358    return "{%s}" % withcommas
359
360def check_syntax_error(testcase, statement):
361    try:
362        compile(statement, '<test string>', 'exec')
363    except SyntaxError:
364        pass
365    else:
366        testcase.fail('Missing SyntaxError: "%s"' % statement)
367
368def open_urlresource(url):
369    import urllib, urlparse
370
371    requires('urlfetch')
372    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
373
374    for path in [os.path.curdir, os.path.pardir]:
375        fn = os.path.join(path, filename)
376        if os.path.exists(fn):
377            return open(fn)
378
379    print >> get_original_stdout(), '\tfetching %s ...' % url
380    fn, _ = urllib.urlretrieve(url, filename)
381    return open(fn)
382
383
384class WarningMessage(object):
385    "Holds the result of a single showwarning() call"
386    _WARNING_DETAILS = "message category filename lineno line".split()
387    def __init__(self, message, category, filename, lineno, line=None):
388        for attr in self._WARNING_DETAILS:
389            setattr(self, attr, locals()[attr])
390        self._category_name = category.__name__ if category else None
391
392    def __str__(self):
393        return ("{message : %r, category : %r, filename : %r, lineno : %s, "
394                    "line : %r}" % (self.message, self._category_name,
395                                    self.filename, self.lineno, self.line))
396
397class WarningRecorder(object):
398    "Records the result of any showwarning calls"
399    def __init__(self):
400        self.warnings = []
401        self._set_last(None)
402
403    def _showwarning(self, message, category, filename, lineno,
404                    file=None, line=None):
405        wm = WarningMessage(message, category, filename, lineno, line)
406        self.warnings.append(wm)
407        self._set_last(wm)
408
409    def _set_last(self, last_warning):
410        if last_warning is None:
411            for attr in WarningMessage._WARNING_DETAILS:
412                setattr(self, attr, None)
413        else:
414            for attr in WarningMessage._WARNING_DETAILS:
415                setattr(self, attr, getattr(last_warning, attr))
416
417    def reset(self):
418        self.warnings = []
419        self._set_last(None)
420
421    def __str__(self):
422        return '[%s]' % (', '.join(map(str, self.warnings)))
423
424@contextlib.contextmanager
425def catch_warning(module=warnings, record=True):
426    """Guard the warnings filter from being permanently changed and
427    optionally record the details of any warnings that are issued.
428
429    Use like this:
430
431        with catch_warning() as w:
432            warnings.warn("foo")
433            assert str(w.message) == "foo"
434    """
435    original_filters = module.filters
436    original_showwarning = module.showwarning
437    if record:
438        recorder = WarningRecorder()
439        module.showwarning = recorder._showwarning
440    else:
441        recorder = None
442    try:
443        # Replace the filters with a copy of the original
444        module.filters = module.filters[:]
445        yield recorder
446    finally:
447        module.showwarning = original_showwarning
448        module.filters = original_filters
449
450
451class CleanImport(object):
452    """Context manager to force import to return a new module reference.
453
454    This is useful for testing module-level behaviours, such as
455    the emission of a DeprecationWarning on import.
456
457    Use like this:
458
459        with CleanImport("foo"):
460            __import__("foo") # new reference
461    """
462
463    def __init__(self, *module_names):
464        self.original_modules = sys.modules.copy()
465        for module_name in module_names:
466            if module_name in sys.modules:
467                module = sys.modules[module_name]
468                # It is possible that module_name is just an alias for
469                # another module (e.g. stub for modules renamed in 3.x).
470                # In that case, we also need delete the real module to clear
471                # the import cache.
472                if module.__name__ != module_name:
473                    del sys.modules[module.__name__]
474                del sys.modules[module_name]
475
476    def __enter__(self):
477        return self
478
479    def __exit__(self, *ignore_exc):
480        sys.modules.update(self.original_modules)
481
482
483class EnvironmentVarGuard(object):
484
485    """Class to help protect the environment variable properly.  Can be used as
486    a context manager."""
487
488    def __init__(self):
489        self._environ = os.environ
490        self._unset = set()
491        self._reset = dict()
492
493    def set(self, envvar, value):
494        if envvar not in self._environ:
495            self._unset.add(envvar)
496        else:
497            self._reset[envvar] = self._environ[envvar]
498        self._environ[envvar] = value
499
500    def unset(self, envvar):
501        if envvar in self._environ:
502            self._reset[envvar] = self._environ[envvar]
503            del self._environ[envvar]
504
505    def __enter__(self):
506        return self
507
508    def __exit__(self, *ignore_exc):
509        for envvar, value in self._reset.iteritems():
510            self._environ[envvar] = value
511        for unset in self._unset:
512            del self._environ[unset]
513
514class TransientResource(object):
515
516    """Raise ResourceDenied if an exception is raised while the context manager
517    is in effect that matches the specified exception and attributes."""
518
519    def __init__(self, exc, **kwargs):
520        self.exc = exc
521        self.attrs = kwargs
522
523    def __enter__(self):
524        return self
525
526    def __exit__(self, type_=None, value=None, traceback=None):
527        """If type_ is a subclass of self.exc and value has attributes matching
528        self.attrs, raise ResourceDenied.  Otherwise let the exception
529        propagate (if any)."""
530        if type_ is not None and issubclass(self.exc, type_):
531            for attr, attr_value in self.attrs.iteritems():
532                if not hasattr(value, attr):
533                    break
534                if getattr(value, attr) != attr_value:
535                    break
536            else:
537                raise ResourceDenied("an optional resource is not available")
538
539
540def transient_internet():
541    """Return a context manager that raises ResourceDenied when various issues
542    with the Internet connection manifest themselves as exceptions."""
543    time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
544    socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
545    ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
546    return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
547
548
549@contextlib.contextmanager
550def captured_output(stream_name):
551    """Run the 'with' statement body using a StringIO object in place of a
552    specific attribute on the sys module.
553    Example use (with 'stream_name=stdout')::
554
555       with captured_stdout() as s:
556           print "hello"
557       assert s.getvalue() == "hello"
558    """
559    import StringIO
560    orig_stdout = getattr(sys, stream_name)
561    setattr(sys, stream_name, StringIO.StringIO())
562    try:
563        yield getattr(sys, stream_name)
564    finally:
565        setattr(sys, stream_name, orig_stdout)
566
567def captured_stdout():
568    return captured_output("stdout")
569
570
571#=======================================================================
572# Decorator for running a function in a different locale, correctly resetting
573# it afterwards.
574
575def run_with_locale(catstr, *locales):
576    def decorator(func):
577        def inner(*args, **kwds):
578            try:
579                import locale
580                category = getattr(locale, catstr)
581                orig_locale = locale.setlocale(category)
582            except AttributeError:
583                # if the test author gives us an invalid category string
584                raise
585            except:
586                # cannot retrieve original locale, so do nothing
587                locale = orig_locale = None
588            else:
589                for loc in locales:
590                    try:
591                        locale.setlocale(category, loc)
592                        break
593                    except:
594                        pass
595
596            # now run the function, resetting the locale on exceptions
597            try:
598                return func(*args, **kwds)
599            finally:
600                if locale and orig_locale:
601                    locale.setlocale(category, orig_locale)
602        inner.func_name = func.func_name
603        inner.__doc__ = func.__doc__
604        return inner
605    return decorator
606
607#=======================================================================
608# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
609
610# Some handy shorthands. Note that these are used for byte-limits as well
611# as size-limits, in the various bigmem tests
612_1M = 1024*1024
613_1G = 1024 * _1M
614_2G = 2 * _1G
615_4G = 4 * _1G
616
617MAX_Py_ssize_t = sys.maxsize
618
619def set_memlimit(limit):
620    import re
621    global max_memuse
622    global real_max_memuse
623    sizes = {
624        'k': 1024,
625        'm': _1M,
626        'g': _1G,
627        't': 1024*_1G,
628    }
629    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
630                 re.IGNORECASE | re.VERBOSE)
631    if m is None:
632        raise ValueError('Invalid memory limit %r' % (limit,))
633    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
634    real_max_memuse = memlimit
635    if memlimit > MAX_Py_ssize_t:
636        memlimit = MAX_Py_ssize_t
637    if memlimit < _2G - 1:
638        raise ValueError('Memory limit %r too low to be useful' % (limit,))
639    max_memuse = memlimit
640
641def bigmemtest(minsize, memuse, overhead=5*_1M):
642    """Decorator for bigmem tests.
643
644    'minsize' is the minimum useful size for the test (in arbitrary,
645    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
646    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
647    independent of the testsize, and defaults to 5Mb.
648
649    The decorator tries to guess a good value for 'size' and passes it to
650    the decorated test function. If minsize * memuse is more than the
651    allowed memory use (as defined by max_memuse), the test is skipped.
652    Otherwise, minsize is adjusted upward to use up to max_memuse.
653    """
654    def decorator(f):
655        def wrapper(self):
656            if not max_memuse:
657                # If max_memuse is 0 (the default),
658                # we still want to run the tests with size set to a few kb,
659                # to make sure they work. We still want to avoid using
660                # too much memory, though, but we do that noisily.
661                maxsize = 5147
662                self.failIf(maxsize * memuse + overhead > 20 * _1M)
663            else:
664                maxsize = int((max_memuse - overhead) / memuse)
665                if maxsize < minsize:
666                    # Really ought to print 'test skipped' or something
667                    if verbose:
668                        sys.stderr.write("Skipping %s because of memory "
669                                         "constraint\n" % (f.__name__,))
670                    return
671                # Try to keep some breathing room in memory use
672                maxsize = max(maxsize - 50 * _1M, minsize)
673            return f(self, maxsize)
674        wrapper.minsize = minsize
675        wrapper.memuse = memuse
676        wrapper.overhead = overhead
677        return wrapper
678    return decorator
679
680def precisionbigmemtest(size, memuse, overhead=5*_1M):
681    def decorator(f):
682        def wrapper(self):
683            if not real_max_memuse:
684                maxsize = 5147
685            else:
686                maxsize = size
687
688                if real_max_memuse and real_max_memuse < maxsize * memuse:
689                    if verbose:
690                        sys.stderr.write("Skipping %s because of memory "
691                                         "constraint\n" % (f.__name__,))
692                    return
693
694            return f(self, maxsize)
695        wrapper.size = size
696        wrapper.memuse = memuse
697        wrapper.overhead = overhead
698        return wrapper
699    return decorator
700
701def bigaddrspacetest(f):
702    """Decorator for tests that fill the address space."""
703    def wrapper(self):
704        if max_memuse < MAX_Py_ssize_t:
705            if verbose:
706                sys.stderr.write("Skipping %s because of memory "
707                                 "constraint\n" % (f.__name__,))
708        else:
709            return f(self)
710    return wrapper
711
712#=======================================================================
713# unittest integration.
714
715class BasicTestRunner:
716    def run(self, test):
717        result = unittest.TestResult()
718        test(result)
719        return result
720
721
722def _run_suite(suite):
723    """Run tests from a unittest.TestSuite-derived class."""
724    if verbose:
725        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
726    else:
727        runner = BasicTestRunner()
728
729    result = runner.run(suite)
730    if not result.wasSuccessful():
731        if len(result.errors) == 1 and not result.failures:
732            err = result.errors[0][1]
733        elif len(result.failures) == 1 and not result.errors:
734            err = result.failures[0][1]
735        else:
736            err = "errors occurred; run in verbose mode for details"
737        raise TestFailed(err)
738
739
740def run_unittest(*classes):
741    """Run tests from unittest.TestCase-derived classes."""
742    valid_types = (unittest.TestSuite, unittest.TestCase)
743    suite = unittest.TestSuite()
744    for cls in classes:
745        if isinstance(cls, str):
746            if cls in sys.modules:
747                suite.addTest(unittest.findTestCases(sys.modules[cls]))
748            else:
749                raise ValueError("str arguments must be keys in sys.modules")
750        elif isinstance(cls, valid_types):
751            suite.addTest(cls)
752        else:
753            suite.addTest(unittest.makeSuite(cls))
754    _run_suite(suite)
755
756
757#=======================================================================
758# doctest driver.
759
760def run_doctest(module, verbosity=None):
761    """Run doctest on the given module.  Return (#failures, #tests).
762
763    If optional argument verbosity is not specified (or is None), pass
764    test_support's belief about verbosity on to doctest.  Else doctest's
765    usual behavior is used (it searches sys.argv for -v).
766    """
767
768    import doctest
769
770    if verbosity is None:
771        verbosity = verbose
772    else:
773        verbosity = None
774
775    # Direct doctest output (normally just errors) to real stdout; doctest
776    # output shouldn't be compared by regrtest.
777    save_stdout = sys.stdout
778    sys.stdout = get_original_stdout()
779    try:
780        f, t = doctest.testmod(module, verbose=verbosity)
781        if f:
782            raise TestFailed("%d of %d doctests failed" % (f, t))
783    finally:
784        sys.stdout = save_stdout
785    if verbose:
786        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
787    return f, t
788
789#=======================================================================
790# Threading support to prevent reporting refleaks when running regrtest.py -R
791
792def threading_setup():
793    import threading
794    return len(threading._active), len(threading._limbo)
795
796def threading_cleanup(num_active, num_limbo):
797    import threading
798    import time
799
800    _MAX_COUNT = 10
801    count = 0
802    while len(threading._active) != num_active and count < _MAX_COUNT:
803        count += 1
804        time.sleep(0.1)
805
806    count = 0
807    while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
808        count += 1
809        time.sleep(0.1)
810
811def reap_children():
812    """Use this function at the end of test_main() whenever sub-processes
813    are started.  This will help ensure that no extra children (zombies)
814    stick around to hog resources and create problems when looking
815    for refleaks.
816    """
817
818    # Reap all our dead child processes so we don't leave zombies around.
819    # These hog resources and might be causing some of the buildbots to die.
820    if hasattr(os, 'waitpid'):
821        any_process = -1
822        while True:
823            try:
824                # This will raise an exception on Windows.  That's ok.
825                pid, status = os.waitpid(any_process, os.WNOHANG)
826                if pid == 0:
827                    break
828            except:
829                break
830