test_support.py revision 9d4418242757f616ca7041409c7015e166b2c9f9
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4    raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import socket
9import sys
10import os
11import os.path
12import shutil
13import warnings
14import unittest
15
16class Error(Exception):
17    """Base class for regression test exceptions."""
18
19class TestFailed(Error):
20    """Test failed."""
21
22class TestSkipped(Error):
23    """Test skipped.
24
25    This can be raised to indicate that a test was deliberatly
26    skipped, but not because a feature wasn't available.  For
27    example, if some resource can't be used, such as the network
28    appears to be unavailable, this should be raised instead of
29    TestFailed.
30    """
31
32class ResourceDenied(TestSkipped):
33    """Test skipped because it requested a disallowed resource.
34
35    This is raised when a test calls requires() for a resource that
36    has not be enabled.  It is used to distinguish between expected
37    and unexpected skips.
38    """
39
40def import_module(name, deprecated=False):
41    """Import the module to be tested, raising TestSkipped if it is not
42    available."""
43    with catch_warning():
44        if deprecated:
45            warnings.filterwarnings("ignore", ".+ module", DeprecationWarning)
46        try:
47            module = __import__(name, level=0)
48        except ImportError:
49            raise TestSkipped("No module named " + name)
50        else:
51            return module
52
53verbose = 1              # Flag set to 0 by regrtest.py
54use_resources = None     # Flag set to [] by regrtest.py
55max_memuse = 0           # Disable bigmem tests (they will still be run with
56                         # small sizes, to make sure they work.)
57
58# _original_stdout is meant to hold stdout at the time regrtest began.
59# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
60# The point is to have some flavor of stdout the user can actually see.
61_original_stdout = None
62def record_original_stdout(stdout):
63    global _original_stdout
64    _original_stdout = stdout
65
66def get_original_stdout():
67    return _original_stdout or sys.stdout
68
69def unload(name):
70    try:
71        del sys.modules[name]
72    except KeyError:
73        pass
74
75def unlink(filename):
76    try:
77        os.unlink(filename)
78    except OSError:
79        pass
80
81def rmtree(path):
82    try:
83        shutil.rmtree(path)
84    except OSError, e:
85        # Unix returns ENOENT, Windows returns ESRCH.
86        if e.errno not in (errno.ENOENT, errno.ESRCH):
87            raise
88
89def forget(modname):
90    '''"Forget" a module was ever imported by removing it from sys.modules and
91    deleting any .pyc and .pyo files.'''
92    unload(modname)
93    for dirname in sys.path:
94        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
95        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
96        # the chance exists that there is no .pyc (and thus the 'try' statement
97        # is exited) but there is a .pyo file.
98        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
99
100def is_resource_enabled(resource):
101    """Test whether a resource is enabled.  Known resources are set by
102    regrtest.py."""
103    return use_resources is not None and resource in use_resources
104
105def requires(resource, msg=None):
106    """Raise ResourceDenied if the specified resource is not available.
107
108    If the caller's module is __main__ then automatically return True.  The
109    possibility of False being returned occurs when regrtest.py is executing."""
110    # see if the caller's module is __main__ - if so, treat as if
111    # the resource was set
112    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
113        return
114    if not is_resource_enabled(resource):
115        if msg is None:
116            msg = "Use of the `%s' resource not enabled" % resource
117        raise ResourceDenied(msg)
118
119HOST = 'localhost'
120
121def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
122    """Returns an unused port that should be suitable for binding.  This is
123    achieved by creating a temporary socket with the same family and type as
124    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
125    the specified host address (defaults to 0.0.0.0) with the port set to 0,
126    eliciting an unused ephemeral port from the OS.  The temporary socket is
127    then closed and deleted, and the ephemeral port is returned.
128
129    Either this method or bind_port() should be used for any tests where a
130    server socket needs to be bound to a particular port for the duration of
131    the test.  Which one to use depends on whether the calling code is creating
132    a python socket, or if an unused port needs to be provided in a constructor
133    or passed to an external program (i.e. the -accept argument to openssl's
134    s_server mode).  Always prefer bind_port() over find_unused_port() where
135    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
136    socket is bound to a hard coded port, the ability to run multiple instances
137    of the test simultaneously on the same host is compromised, which makes the
138    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
139    may simply manifest as a failed test, which can be recovered from without
140    intervention in most cases, but on Windows, the entire python process can
141    completely and utterly wedge, requiring someone to log in to the buildbot
142    and manually kill the affected process.
143
144    (This is easy to reproduce on Windows, unfortunately, and can be traced to
145    the SO_REUSEADDR socket option having different semantics on Windows versus
146    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
147    listen and then accept connections on identical host/ports.  An EADDRINUSE
148    socket.error will be raised at some point (depending on the platform and
149    the order bind and listen were called on each socket).
150
151    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
152    will ever be raised when attempting to bind two identical host/ports. When
153    accept() is called on each socket, the second caller's process will steal
154    the port from the first caller, leaving them both in an awkwardly wedged
155    state where they'll no longer respond to any signals or graceful kills, and
156    must be forcibly killed via OpenProcess()/TerminateProcess().
157
158    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
159    instead of SO_REUSEADDR, which effectively affords the same semantics as
160    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
161    Source world compared to Windows ones, this is a common mistake.  A quick
162    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
163    openssl.exe is called with the 's_server' option, for example. See
164    http://bugs.python.org/issue2550 for more info.  The following site also
165    has a very thorough description about the implications of both REUSEADDR
166    and EXCLUSIVEADDRUSE on Windows:
167    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
168
169    XXX: although this approach is a vast improvement on previous attempts to
170    elicit unused ports, it rests heavily on the assumption that the ephemeral
171    port returned to us by the OS won't immediately be dished back out to some
172    other process when we close and delete our temporary socket but before our
173    calling code has a chance to bind the returned port.  We can deal with this
174    issue if/when we come across it."""
175    tempsock = socket.socket(family, socktype)
176    port = bind_port(tempsock)
177    tempsock.close()
178    del tempsock
179    return port
180
181def bind_port(sock, host=HOST):
182    """Bind the socket to a free port and return the port number.  Relies on
183    ephemeral ports in order to ensure we are using an unbound port.  This is
184    important as many tests may be running simultaneously, especially in a
185    buildbot environment.  This method raises an exception if the sock.family
186    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
187    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
188    for TCP/IP sockets.  The only case for setting these options is testing
189    multicasting via multiple UDP sockets.
190
191    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
192    on Windows), it will be set on the socket.  This will prevent anyone else
193    from bind()'ing to our host/port for the duration of the test.
194    """
195    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
196        if hasattr(socket, 'SO_REUSEADDR'):
197            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
198                raise TestFailed("tests should never set the SO_REUSEADDR "   \
199                                 "socket option on TCP/IP sockets!")
200        if hasattr(socket, 'SO_REUSEPORT'):
201            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
202                raise TestFailed("tests should never set the SO_REUSEPORT "   \
203                                 "socket option on TCP/IP sockets!")
204        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
205            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
206
207    sock.bind((host, 0))
208    port = sock.getsockname()[1]
209    return port
210
211FUZZ = 1e-6
212
213def fcmp(x, y): # fuzzy comparison function
214    if type(x) == type(0.0) or type(y) == type(0.0):
215        try:
216            x, y = coerce(x, y)
217            fuzz = (abs(x) + abs(y)) * FUZZ
218            if abs(x-y) <= fuzz:
219                return 0
220        except:
221            pass
222    elif type(x) == type(y) and type(x) in (type(()), type([])):
223        for i in range(min(len(x), len(y))):
224            outcome = fcmp(x[i], y[i])
225            if outcome != 0:
226                return outcome
227        return cmp(len(x), len(y))
228    return cmp(x, y)
229
230try:
231    unicode
232    have_unicode = True
233except NameError:
234    have_unicode = False
235
236is_jython = sys.platform.startswith('java')
237
238# Filename used for testing
239if os.name == 'java':
240    # Jython disallows @ in module names
241    TESTFN = '$test'
242elif os.name == 'riscos':
243    TESTFN = 'testfile'
244else:
245    TESTFN = '@test'
246    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
247    if have_unicode:
248        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
249        # TESTFN_UNICODE is a filename that can be encoded using the
250        # file system encoding, but *not* with the default (ascii) encoding
251        if isinstance('', unicode):
252            # python -U
253            # XXX perhaps unicode() should accept Unicode strings?
254            TESTFN_UNICODE = "@test-\xe0\xf2"
255        else:
256            # 2 latin characters.
257            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
258        TESTFN_ENCODING = sys.getfilesystemencoding()
259        # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
260        # able to be encoded by *either* the default or filesystem encoding.
261        # This test really only makes sense on Windows NT platforms
262        # which have special Unicode support in posixmodule.
263        if (not hasattr(sys, "getwindowsversion") or
264                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
265            TESTFN_UNICODE_UNENCODEABLE = None
266        else:
267            # Japanese characters (I think - from bug 846133)
268            TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
269            try:
270                # XXX - Note - should be using TESTFN_ENCODING here - but for
271                # Windows, "mbcs" currently always operates as if in
272                # errors=ignore' mode - hence we get '?' characters rather than
273                # the exception.  'Latin1' operates as we expect - ie, fails.
274                # See [ 850997 ] mbcs encoding ignores errors
275                TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
276            except UnicodeEncodeError:
277                pass
278            else:
279                print \
280                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
281                'Unicode filename tests may not be effective' \
282                % TESTFN_UNICODE_UNENCODEABLE
283
284# Make sure we can write to TESTFN, try in /tmp if we can't
285fp = None
286try:
287    fp = open(TESTFN, 'w+')
288except IOError:
289    TMP_TESTFN = os.path.join('/tmp', TESTFN)
290    try:
291        fp = open(TMP_TESTFN, 'w+')
292        TESTFN = TMP_TESTFN
293        del TMP_TESTFN
294    except IOError:
295        print ('WARNING: tests will fail, unable to write to: %s or %s' %
296                (TESTFN, TMP_TESTFN))
297if fp is not None:
298    fp.close()
299    unlink(TESTFN)
300del fp
301
302def findfile(file, here=__file__):
303    """Try to find a file on sys.path and the working directory.  If it is not
304    found the argument passed to the function is returned (this does not
305    necessarily signal failure; could still be the legitimate path)."""
306    if os.path.isabs(file):
307        return file
308    path = sys.path
309    path = [os.path.dirname(here)] + path
310    for dn in path:
311        fn = os.path.join(dn, file)
312        if os.path.exists(fn): return fn
313    return file
314
315def verify(condition, reason='test failed'):
316    """Verify that condition is true. If not, raise TestFailed.
317
318       The optional argument reason can be given to provide
319       a better error text.
320    """
321
322    if not condition:
323        raise TestFailed(reason)
324
325def vereq(a, b):
326    """Raise TestFailed if a == b is false.
327
328    This is better than verify(a == b) because, in case of failure, the
329    error message incorporates repr(a) and repr(b) so you can see the
330    inputs.
331
332    Note that "not (a == b)" isn't necessarily the same as "a != b"; the
333    former is tested.
334    """
335
336    if not (a == b):
337        raise TestFailed("%r == %r" % (a, b))
338
339def sortdict(dict):
340    "Like repr(dict), but in sorted order."
341    items = dict.items()
342    items.sort()
343    reprpairs = ["%r: %r" % pair for pair in items]
344    withcommas = ", ".join(reprpairs)
345    return "{%s}" % withcommas
346
347def check_syntax_error(testcase, statement):
348    try:
349        compile(statement, '<test string>', 'exec')
350    except SyntaxError:
351        pass
352    else:
353        testcase.fail('Missing SyntaxError: "%s"' % statement)
354
355def open_urlresource(url):
356    import urllib, urlparse
357
358    requires('urlfetch')
359    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
360
361    for path in [os.path.curdir, os.path.pardir]:
362        fn = os.path.join(path, filename)
363        if os.path.exists(fn):
364            return open(fn)
365
366    print >> get_original_stdout(), '\tfetching %s ...' % url
367    fn, _ = urllib.urlretrieve(url, filename)
368    return open(fn)
369
370
371class WarningMessage(object):
372    "Holds the result of the latest showwarning() call"
373    def __init__(self):
374        self.message = None
375        self.category = None
376        self.filename = None
377        self.lineno = None
378
379    def _showwarning(self, message, category, filename, lineno, file=None,
380                        line=None):
381        self.message = message
382        self.category = category
383        self.filename = filename
384        self.lineno = lineno
385        self.line = line
386
387    def reset(self):
388        self._showwarning(*((None,)*6))
389
390    def __str__(self):
391        return ("{message : %r, category : %r, filename : %r, lineno : %s, "
392                    "line : %r}" % (self.message,
393                            self.category.__name__ if self.category else None,
394                            self.filename, self.lineno, self.line))
395
396
397@contextlib.contextmanager
398def catch_warning(module=warnings):
399    """
400    Guard the warnings filter from being permanently changed and record the
401    data of the last warning that has been issued.
402
403    Use like this:
404
405        with catch_warning() as w:
406            warnings.warn("foo")
407            assert str(w.message) == "foo"
408    """
409    warning_obj = WarningMessage()
410    original_filters = module.filters[:]
411    original_showwarning = module.showwarning
412    module.showwarning = warning_obj._showwarning
413    try:
414        yield warning_obj
415    finally:
416        module.showwarning = original_showwarning
417        module.filters = original_filters
418
419class EnvironmentVarGuard(object):
420
421    """Class to help protect the environment variable properly.  Can be used as
422    a context manager."""
423
424    def __init__(self):
425        self._environ = os.environ
426        self._unset = set()
427        self._reset = dict()
428
429    def set(self, envvar, value):
430        if envvar not in self._environ:
431            self._unset.add(envvar)
432        else:
433            self._reset[envvar] = self._environ[envvar]
434        self._environ[envvar] = value
435
436    def unset(self, envvar):
437        if envvar in self._environ:
438            self._reset[envvar] = self._environ[envvar]
439            del self._environ[envvar]
440
441    def __enter__(self):
442        return self
443
444    def __exit__(self, *ignore_exc):
445        for envvar, value in self._reset.iteritems():
446            self._environ[envvar] = value
447        for unset in self._unset:
448            del self._environ[unset]
449
450class TransientResource(object):
451
452    """Raise ResourceDenied if an exception is raised while the context manager
453    is in effect that matches the specified exception and attributes."""
454
455    def __init__(self, exc, **kwargs):
456        self.exc = exc
457        self.attrs = kwargs
458
459    def __enter__(self):
460        return self
461
462    def __exit__(self, type_=None, value=None, traceback=None):
463        """If type_ is a subclass of self.exc and value has attributes matching
464        self.attrs, raise ResourceDenied.  Otherwise let the exception
465        propagate (if any)."""
466        if type_ is not None and issubclass(self.exc, type_):
467            for attr, attr_value in self.attrs.iteritems():
468                if not hasattr(value, attr):
469                    break
470                if getattr(value, attr) != attr_value:
471                    break
472            else:
473                raise ResourceDenied("an optional resource is not available")
474
475
476def transient_internet():
477    """Return a context manager that raises ResourceDenied when various issues
478    with the Internet connection manifest themselves as exceptions."""
479    time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
480    socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
481    ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
482    return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
483
484
485@contextlib.contextmanager
486def captured_output(stream_name):
487    """Run the 'with' statement body using a StringIO object in place of a
488    specific attribute on the sys module.
489    Example use (with 'stream_name=stdout')::
490
491       with captured_stdout() as s:
492           print "hello"
493       assert s.getvalue() == "hello"
494    """
495    import StringIO
496    orig_stdout = getattr(sys, stream_name)
497    setattr(sys, stream_name, StringIO.StringIO())
498    try:
499        yield getattr(sys, stream_name)
500    finally:
501        setattr(sys, stream_name, orig_stdout)
502
503def captured_stdout():
504    return captured_output("stdout")
505
506
507#=======================================================================
508# Decorator for running a function in a different locale, correctly resetting
509# it afterwards.
510
511def run_with_locale(catstr, *locales):
512    def decorator(func):
513        def inner(*args, **kwds):
514            try:
515                import locale
516                category = getattr(locale, catstr)
517                orig_locale = locale.setlocale(category)
518            except AttributeError:
519                # if the test author gives us an invalid category string
520                raise
521            except:
522                # cannot retrieve original locale, so do nothing
523                locale = orig_locale = None
524            else:
525                for loc in locales:
526                    try:
527                        locale.setlocale(category, loc)
528                        break
529                    except:
530                        pass
531
532            # now run the function, resetting the locale on exceptions
533            try:
534                return func(*args, **kwds)
535            finally:
536                if locale and orig_locale:
537                    locale.setlocale(category, orig_locale)
538        inner.func_name = func.func_name
539        inner.__doc__ = func.__doc__
540        return inner
541    return decorator
542
543#=======================================================================
544# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
545
546# Some handy shorthands. Note that these are used for byte-limits as well
547# as size-limits, in the various bigmem tests
548_1M = 1024*1024
549_1G = 1024 * _1M
550_2G = 2 * _1G
551
552# Hack to get at the maximum value an internal index can take.
553class _Dummy:
554    def __getslice__(self, i, j):
555        return j
556MAX_Py_ssize_t = _Dummy()[:]
557
558def set_memlimit(limit):
559    import re
560    global max_memuse
561    sizes = {
562        'k': 1024,
563        'm': _1M,
564        'g': _1G,
565        't': 1024*_1G,
566    }
567    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
568                 re.IGNORECASE | re.VERBOSE)
569    if m is None:
570        raise ValueError('Invalid memory limit %r' % (limit,))
571    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
572    if memlimit > MAX_Py_ssize_t:
573        memlimit = MAX_Py_ssize_t
574    if memlimit < _2G - 1:
575        raise ValueError('Memory limit %r too low to be useful' % (limit,))
576    max_memuse = memlimit
577
578def bigmemtest(minsize, memuse, overhead=5*_1M):
579    """Decorator for bigmem tests.
580
581    'minsize' is the minimum useful size for the test (in arbitrary,
582    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
583    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
584    independent of the testsize, and defaults to 5Mb.
585
586    The decorator tries to guess a good value for 'size' and passes it to
587    the decorated test function. If minsize * memuse is more than the
588    allowed memory use (as defined by max_memuse), the test is skipped.
589    Otherwise, minsize is adjusted upward to use up to max_memuse.
590    """
591    def decorator(f):
592        def wrapper(self):
593            if not max_memuse:
594                # If max_memuse is 0 (the default),
595                # we still want to run the tests with size set to a few kb,
596                # to make sure they work. We still want to avoid using
597                # too much memory, though, but we do that noisily.
598                maxsize = 5147
599                self.failIf(maxsize * memuse + overhead > 20 * _1M)
600            else:
601                maxsize = int((max_memuse - overhead) / memuse)
602                if maxsize < minsize:
603                    # Really ought to print 'test skipped' or something
604                    if verbose:
605                        sys.stderr.write("Skipping %s because of memory "
606                                         "constraint\n" % (f.__name__,))
607                    return
608                # Try to keep some breathing room in memory use
609                maxsize = max(maxsize - 50 * _1M, minsize)
610            return f(self, maxsize)
611        wrapper.minsize = minsize
612        wrapper.memuse = memuse
613        wrapper.overhead = overhead
614        return wrapper
615    return decorator
616
617def bigaddrspacetest(f):
618    """Decorator for tests that fill the address space."""
619    def wrapper(self):
620        if max_memuse < MAX_Py_ssize_t:
621            if verbose:
622                sys.stderr.write("Skipping %s because of memory "
623                                 "constraint\n" % (f.__name__,))
624        else:
625            return f(self)
626    return wrapper
627
628#=======================================================================
629# unittest integration.
630
631class BasicTestRunner:
632    def run(self, test):
633        result = unittest.TestResult()
634        test(result)
635        return result
636
637
638def _run_suite(suite):
639    """Run tests from a unittest.TestSuite-derived class."""
640    if verbose:
641        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
642    else:
643        runner = BasicTestRunner()
644
645    result = runner.run(suite)
646    if not result.wasSuccessful():
647        if len(result.errors) == 1 and not result.failures:
648            err = result.errors[0][1]
649        elif len(result.failures) == 1 and not result.errors:
650            err = result.failures[0][1]
651        else:
652            err = "errors occurred; run in verbose mode for details"
653        raise TestFailed(err)
654
655
656def run_unittest(*classes):
657    """Run tests from unittest.TestCase-derived classes."""
658    valid_types = (unittest.TestSuite, unittest.TestCase)
659    suite = unittest.TestSuite()
660    for cls in classes:
661        if isinstance(cls, str):
662            if cls in sys.modules:
663                suite.addTest(unittest.findTestCases(sys.modules[cls]))
664            else:
665                raise ValueError("str arguments must be keys in sys.modules")
666        elif isinstance(cls, valid_types):
667            suite.addTest(cls)
668        else:
669            suite.addTest(unittest.makeSuite(cls))
670    _run_suite(suite)
671
672
673#=======================================================================
674# doctest driver.
675
676def run_doctest(module, verbosity=None):
677    """Run doctest on the given module.  Return (#failures, #tests).
678
679    If optional argument verbosity is not specified (or is None), pass
680    test_support's belief about verbosity on to doctest.  Else doctest's
681    usual behavior is used (it searches sys.argv for -v).
682    """
683
684    import doctest
685
686    if verbosity is None:
687        verbosity = verbose
688    else:
689        verbosity = None
690
691    # Direct doctest output (normally just errors) to real stdout; doctest
692    # output shouldn't be compared by regrtest.
693    save_stdout = sys.stdout
694    sys.stdout = get_original_stdout()
695    try:
696        f, t = doctest.testmod(module, verbose=verbosity)
697        if f:
698            raise TestFailed("%d of %d doctests failed" % (f, t))
699    finally:
700        sys.stdout = save_stdout
701    if verbose:
702        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
703    return f, t
704
705#=======================================================================
706# Threading support to prevent reporting refleaks when running regrtest.py -R
707
708def threading_setup():
709    import threading
710    return len(threading._active), len(threading._limbo)
711
712def threading_cleanup(num_active, num_limbo):
713    import threading
714    import time
715
716    _MAX_COUNT = 10
717    count = 0
718    while len(threading._active) != num_active and count < _MAX_COUNT:
719        count += 1
720        time.sleep(0.1)
721
722    count = 0
723    while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
724        count += 1
725        time.sleep(0.1)
726
727def reap_children():
728    """Use this function at the end of test_main() whenever sub-processes
729    are started.  This will help ensure that no extra children (zombies)
730    stick around to hog resources and create problems when looking
731    for refleaks.
732    """
733
734    # Reap all our dead child processes so we don't leave zombies around.
735    # These hog resources and might be causing some of the buildbots to die.
736    if hasattr(os, 'waitpid'):
737        any_process = -1
738        while True:
739            try:
740                # This will raise an exception on Windows.  That's ok.
741                pid, status = os.waitpid(any_process, os.WNOHANG)
742                if pid == 0:
743                    break
744            except:
745                break
746