test_support.py revision d8f2d0bdb3d3b9abe53cda9add979e2fc4be7da0
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4    raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import socket
9import sys
10import os
11import os.path
12import shutil
13import warnings
14import unittest
15
16class Error(Exception):
17    """Base class for regression test exceptions."""
18
19class TestFailed(Error):
20    """Test failed."""
21
22class TestSkipped(Error):
23    """Test skipped.
24
25    This can be raised to indicate that a test was deliberatly
26    skipped, but not because a feature wasn't available.  For
27    example, if some resource can't be used, such as the network
28    appears to be unavailable, this should be raised instead of
29    TestFailed.
30    """
31
32class ResourceDenied(TestSkipped):
33    """Test skipped because it requested a disallowed resource.
34
35    This is raised when a test calls requires() for a resource that
36    has not be enabled.  It is used to distinguish between expected
37    and unexpected skips.
38    """
39
40verbose = 1              # Flag set to 0 by regrtest.py
41use_resources = None     # Flag set to [] by regrtest.py
42max_memuse = 0           # Disable bigmem tests (they will still be run with
43                         # small sizes, to make sure they work.)
44
45# _original_stdout is meant to hold stdout at the time regrtest began.
46# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
47# The point is to have some flavor of stdout the user can actually see.
48_original_stdout = None
49def record_original_stdout(stdout):
50    global _original_stdout
51    _original_stdout = stdout
52
53def get_original_stdout():
54    return _original_stdout or sys.stdout
55
56def unload(name):
57    try:
58        del sys.modules[name]
59    except KeyError:
60        pass
61
62def unlink(filename):
63    try:
64        os.unlink(filename)
65    except OSError:
66        pass
67
68def rmtree(path):
69    try:
70        shutil.rmtree(path)
71    except OSError, e:
72        # Unix returns ENOENT, Windows returns ESRCH.
73        if e.errno not in (errno.ENOENT, errno.ESRCH):
74            raise
75
76def forget(modname):
77    '''"Forget" a module was ever imported by removing it from sys.modules and
78    deleting any .pyc and .pyo files.'''
79    unload(modname)
80    for dirname in sys.path:
81        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
82        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
83        # the chance exists that there is no .pyc (and thus the 'try' statement
84        # is exited) but there is a .pyo file.
85        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
86
87def is_resource_enabled(resource):
88    """Test whether a resource is enabled.  Known resources are set by
89    regrtest.py."""
90    return use_resources is not None and resource in use_resources
91
92def requires(resource, msg=None):
93    """Raise ResourceDenied if the specified resource is not available.
94
95    If the caller's module is __main__ then automatically return True.  The
96    possibility of False being returned occurs when regrtest.py is executing."""
97    # see if the caller's module is __main__ - if so, treat as if
98    # the resource was set
99    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
100        return
101    if not is_resource_enabled(resource):
102        if msg is None:
103            msg = "Use of the `%s' resource not enabled" % resource
104        raise ResourceDenied(msg)
105
106HOST = 'localhost'
107
108def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
109    """Returns an unused port that should be suitable for binding.  This is
110    achieved by creating a temporary socket with the same family and type as
111    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
112    the specified host address (defaults to 0.0.0.0) with the port set to 0,
113    eliciting an unused ephemeral port from the OS.  The temporary socket is
114    then closed and deleted, and the ephemeral port is returned.
115
116    Either this method or bind_port() should be used for any tests where a
117    server socket needs to be bound to a particular port for the duration of
118    the test.  Which one to use depends on whether the calling code is creating
119    a python socket, or if an unused port needs to be provided in a constructor
120    or passed to an external program (i.e. the -accept argument to openssl's
121    s_server mode).  Always prefer bind_port() over find_unused_port() where
122    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
123    socket is bound to a hard coded port, the ability to run multiple instances
124    of the test simultaneously on the same host is compromised, which makes the
125    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
126    may simply manifest as a failed test, which can be recovered from without
127    intervention in most cases, but on Windows, the entire python process can
128    completely and utterly wedge, requiring someone to log in to the buildbot
129    and manually kill the affected process.
130
131    (This is easy to reproduce on Windows, unfortunately, and can be traced to
132    the SO_REUSEADDR socket option having different semantics on Windows versus
133    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
134    listen and then accept connections on identical host/ports.  An EADDRINUSE
135    socket.error will be raised at some point (depending on the platform and
136    the order bind and listen were called on each socket).
137
138    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
139    will ever be raised when attempting to bind two identical host/ports. When
140    accept() is called on each socket, the second caller's process will steal
141    the port from the first caller, leaving them both in an awkwardly wedged
142    state where they'll no longer respond to any signals or graceful kills, and
143    must be forcibly killed via OpenProcess()/TerminateProcess().
144
145    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
146    instead of SO_REUSEADDR, which effectively affords the same semantics as
147    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
148    Source world compared to Windows ones, this is a common mistake.  A quick
149    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
150    openssl.exe is called with the 's_server' option, for example. See
151    http://bugs.python.org/issue2550 for more info.  The following site also
152    has a very thorough description about the implications of both REUSEADDR
153    and EXCLUSIVEADDRUSE on Windows:
154    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
155
156    XXX: although this approach is a vast improvement on previous attempts to
157    elicit unused ports, it rests heavily on the assumption that the ephemeral
158    port returned to us by the OS won't immediately be dished back out to some
159    other process when we close and delete our temporary socket but before our
160    calling code has a chance to bind the returned port.  We can deal with this
161    issue if/when we come across it."""
162    tempsock = socket.socket(family, socktype)
163    port = bind_port(tempsock)
164    tempsock.close()
165    del tempsock
166    return port
167
168def bind_port(sock, host=HOST):
169    """Bind the socket to a free port and return the port number.  Relies on
170    ephemeral ports in order to ensure we are using an unbound port.  This is
171    important as many tests may be running simultaneously, especially in a
172    buildbot environment.  This method raises an exception if the sock.family
173    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
174    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
175    for TCP/IP sockets.  The only case for setting these options is testing
176    multicasting via multiple UDP sockets.
177
178    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
179    on Windows), it will be set on the socket.  This will prevent anyone else
180    from bind()'ing to our host/port for the duration of the test.
181    """
182    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
183        if hasattr(socket, 'SO_REUSEADDR'):
184            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
185                raise TestFailed("tests should never set the SO_REUSEADDR "   \
186                                 "socket option on TCP/IP sockets!")
187        if hasattr(socket, 'SO_REUSEPORT'):
188            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
189                raise TestFailed("tests should never set the SO_REUSEPORT "   \
190                                 "socket option on TCP/IP sockets!")
191        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
192            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
193
194    sock.bind((host, 0))
195    port = sock.getsockname()[1]
196    return port
197
198FUZZ = 1e-6
199
200def fcmp(x, y): # fuzzy comparison function
201    if type(x) == type(0.0) or type(y) == type(0.0):
202        try:
203            x, y = coerce(x, y)
204            fuzz = (abs(x) + abs(y)) * FUZZ
205            if abs(x-y) <= fuzz:
206                return 0
207        except:
208            pass
209    elif type(x) == type(y) and type(x) in (type(()), type([])):
210        for i in range(min(len(x), len(y))):
211            outcome = fcmp(x[i], y[i])
212            if outcome != 0:
213                return outcome
214        return cmp(len(x), len(y))
215    return cmp(x, y)
216
217try:
218    unicode
219    have_unicode = True
220except NameError:
221    have_unicode = False
222
223is_jython = sys.platform.startswith('java')
224
225# Filename used for testing
226if os.name == 'java':
227    # Jython disallows @ in module names
228    TESTFN = '$test'
229elif os.name == 'riscos':
230    TESTFN = 'testfile'
231else:
232    TESTFN = '@test'
233    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
234    if have_unicode:
235        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
236        # TESTFN_UNICODE is a filename that can be encoded using the
237        # file system encoding, but *not* with the default (ascii) encoding
238        if isinstance('', unicode):
239            # python -U
240            # XXX perhaps unicode() should accept Unicode strings?
241            TESTFN_UNICODE = "@test-\xe0\xf2"
242        else:
243            # 2 latin characters.
244            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
245        TESTFN_ENCODING = sys.getfilesystemencoding()
246        # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
247        # able to be encoded by *either* the default or filesystem encoding.
248        # This test really only makes sense on Windows NT platforms
249        # which have special Unicode support in posixmodule.
250        if (not hasattr(sys, "getwindowsversion") or
251                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
252            TESTFN_UNICODE_UNENCODEABLE = None
253        else:
254            # Japanese characters (I think - from bug 846133)
255            TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
256            try:
257                # XXX - Note - should be using TESTFN_ENCODING here - but for
258                # Windows, "mbcs" currently always operates as if in
259                # errors=ignore' mode - hence we get '?' characters rather than
260                # the exception.  'Latin1' operates as we expect - ie, fails.
261                # See [ 850997 ] mbcs encoding ignores errors
262                TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
263            except UnicodeEncodeError:
264                pass
265            else:
266                print \
267                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
268                'Unicode filename tests may not be effective' \
269                % TESTFN_UNICODE_UNENCODEABLE
270
271# Make sure we can write to TESTFN, try in /tmp if we can't
272fp = None
273try:
274    fp = open(TESTFN, 'w+')
275except IOError:
276    TMP_TESTFN = os.path.join('/tmp', TESTFN)
277    try:
278        fp = open(TMP_TESTFN, 'w+')
279        TESTFN = TMP_TESTFN
280        del TMP_TESTFN
281    except IOError:
282        print ('WARNING: tests will fail, unable to write to: %s or %s' %
283                (TESTFN, TMP_TESTFN))
284if fp is not None:
285    fp.close()
286    unlink(TESTFN)
287del fp
288
289def findfile(file, here=__file__):
290    """Try to find a file on sys.path and the working directory.  If it is not
291    found the argument passed to the function is returned (this does not
292    necessarily signal failure; could still be the legitimate path)."""
293    if os.path.isabs(file):
294        return file
295    path = sys.path
296    path = [os.path.dirname(here)] + path
297    for dn in path:
298        fn = os.path.join(dn, file)
299        if os.path.exists(fn): return fn
300    return file
301
302def verify(condition, reason='test failed'):
303    """Verify that condition is true. If not, raise TestFailed.
304
305       The optional argument reason can be given to provide
306       a better error text.
307    """
308
309    if not condition:
310        raise TestFailed(reason)
311
312def vereq(a, b):
313    """Raise TestFailed if a == b is false.
314
315    This is better than verify(a == b) because, in case of failure, the
316    error message incorporates repr(a) and repr(b) so you can see the
317    inputs.
318
319    Note that "not (a == b)" isn't necessarily the same as "a != b"; the
320    former is tested.
321    """
322
323    if not (a == b):
324        raise TestFailed("%r == %r" % (a, b))
325
326def sortdict(dict):
327    "Like repr(dict), but in sorted order."
328    items = dict.items()
329    items.sort()
330    reprpairs = ["%r: %r" % pair for pair in items]
331    withcommas = ", ".join(reprpairs)
332    return "{%s}" % withcommas
333
334def check_syntax_error(testcase, statement):
335    try:
336        compile(statement, '<test string>', 'exec')
337    except SyntaxError:
338        pass
339    else:
340        testcase.fail('Missing SyntaxError: "%s"' % statement)
341
342def open_urlresource(url):
343    import urllib, urlparse
344
345    requires('urlfetch')
346    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
347
348    for path in [os.path.curdir, os.path.pardir]:
349        fn = os.path.join(path, filename)
350        if os.path.exists(fn):
351            return open(fn)
352
353    print >> get_original_stdout(), '\tfetching %s ...' % url
354    fn, _ = urllib.urlretrieve(url, filename)
355    return open(fn)
356
357
358class WarningMessage(object):
359    "Holds the result of the latest showwarning() call"
360    def __init__(self):
361        self.message = None
362        self.category = None
363        self.filename = None
364        self.lineno = None
365
366    def _showwarning(self, message, category, filename, lineno, file=None,
367                        line=None):
368        self.message = message
369        self.category = category
370        self.filename = filename
371        self.lineno = lineno
372        self.line = line
373
374    def reset(self):
375        self._showwarning(*((None,)*6))
376
377    def __str__(self):
378        return ("{message : %r, category : %r, filename : %r, lineno : %s, "
379                    "line : %r}" % (self.message,
380                            self.category.__name__ if self.category else None,
381                            self.filename, self.lineno, self.line))
382
383
384@contextlib.contextmanager
385def catch_warning(module=warnings):
386    """
387    Guard the warnings filter from being permanently changed and record the
388    data of the last warning that has been issued.
389
390    Use like this:
391
392        with catch_warning() as w:
393            warnings.warn("foo")
394            assert str(w.message) == "foo"
395    """
396    warning_obj = WarningMessage()
397    original_filters = module.filters[:]
398    original_showwarning = module.showwarning
399    module.showwarning = warning_obj._showwarning
400    try:
401        yield warning_obj
402    finally:
403        module.showwarning = original_showwarning
404        module.filters = original_filters
405
406class EnvironmentVarGuard(object):
407
408    """Class to help protect the environment variable properly.  Can be used as
409    a context manager."""
410
411    def __init__(self):
412        self._environ = os.environ
413        self._unset = set()
414        self._reset = dict()
415
416    def set(self, envvar, value):
417        if envvar not in self._environ:
418            self._unset.add(envvar)
419        else:
420            self._reset[envvar] = self._environ[envvar]
421        self._environ[envvar] = value
422
423    def unset(self, envvar):
424        if envvar in self._environ:
425            self._reset[envvar] = self._environ[envvar]
426            del self._environ[envvar]
427
428    def __enter__(self):
429        return self
430
431    def __exit__(self, *ignore_exc):
432        for envvar, value in self._reset.iteritems():
433            self._environ[envvar] = value
434        for unset in self._unset:
435            del self._environ[unset]
436
437class TransientResource(object):
438
439    """Raise ResourceDenied if an exception is raised while the context manager
440    is in effect that matches the specified exception and attributes."""
441
442    def __init__(self, exc, **kwargs):
443        self.exc = exc
444        self.attrs = kwargs
445
446    def __enter__(self):
447        return self
448
449    def __exit__(self, type_=None, value=None, traceback=None):
450        """If type_ is a subclass of self.exc and value has attributes matching
451        self.attrs, raise ResourceDenied.  Otherwise let the exception
452        propagate (if any)."""
453        if type_ is not None and issubclass(self.exc, type_):
454            for attr, attr_value in self.attrs.iteritems():
455                if not hasattr(value, attr):
456                    break
457                if getattr(value, attr) != attr_value:
458                    break
459            else:
460                raise ResourceDenied("an optional resource is not available")
461
462
463def transient_internet():
464    """Return a context manager that raises ResourceDenied when various issues
465    with the Internet connection manifest themselves as exceptions."""
466    time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
467    socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
468    ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
469    return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
470
471
472@contextlib.contextmanager
473def captured_output(stream_name):
474    """Run the 'with' statement body using a StringIO object in place of a
475    specific attribute on the sys module.
476    Example use (with 'stream_name=stdout')::
477
478       with captured_stdout() as s:
479           print "hello"
480       assert s.getvalue() == "hello"
481    """
482    import StringIO
483    orig_stdout = getattr(sys, stream_name)
484    setattr(sys, stream_name, StringIO.StringIO())
485    try:
486        yield getattr(sys, stream_name)
487    finally:
488        setattr(sys, stream_name, orig_stdout)
489
490def captured_stdout():
491    return captured_output("stdout")
492
493
494#=======================================================================
495# Decorator for running a function in a different locale, correctly resetting
496# it afterwards.
497
498def run_with_locale(catstr, *locales):
499    def decorator(func):
500        def inner(*args, **kwds):
501            try:
502                import locale
503                category = getattr(locale, catstr)
504                orig_locale = locale.setlocale(category)
505            except AttributeError:
506                # if the test author gives us an invalid category string
507                raise
508            except:
509                # cannot retrieve original locale, so do nothing
510                locale = orig_locale = None
511            else:
512                for loc in locales:
513                    try:
514                        locale.setlocale(category, loc)
515                        break
516                    except:
517                        pass
518
519            # now run the function, resetting the locale on exceptions
520            try:
521                return func(*args, **kwds)
522            finally:
523                if locale and orig_locale:
524                    locale.setlocale(category, orig_locale)
525        inner.func_name = func.func_name
526        inner.__doc__ = func.__doc__
527        return inner
528    return decorator
529
530#=======================================================================
531# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
532
533# Some handy shorthands. Note that these are used for byte-limits as well
534# as size-limits, in the various bigmem tests
535_1M = 1024*1024
536_1G = 1024 * _1M
537_2G = 2 * _1G
538
539# Hack to get at the maximum value an internal index can take.
540class _Dummy:
541    def __getslice__(self, i, j):
542        return j
543MAX_Py_ssize_t = _Dummy()[:]
544
545def set_memlimit(limit):
546    import re
547    global max_memuse
548    sizes = {
549        'k': 1024,
550        'm': _1M,
551        'g': _1G,
552        't': 1024*_1G,
553    }
554    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
555                 re.IGNORECASE | re.VERBOSE)
556    if m is None:
557        raise ValueError('Invalid memory limit %r' % (limit,))
558    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
559    if memlimit > MAX_Py_ssize_t:
560        memlimit = MAX_Py_ssize_t
561    if memlimit < _2G - 1:
562        raise ValueError('Memory limit %r too low to be useful' % (limit,))
563    max_memuse = memlimit
564
565def bigmemtest(minsize, memuse, overhead=5*_1M):
566    """Decorator for bigmem tests.
567
568    'minsize' is the minimum useful size for the test (in arbitrary,
569    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
570    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
571    independent of the testsize, and defaults to 5Mb.
572
573    The decorator tries to guess a good value for 'size' and passes it to
574    the decorated test function. If minsize * memuse is more than the
575    allowed memory use (as defined by max_memuse), the test is skipped.
576    Otherwise, minsize is adjusted upward to use up to max_memuse.
577    """
578    def decorator(f):
579        def wrapper(self):
580            if not max_memuse:
581                # If max_memuse is 0 (the default),
582                # we still want to run the tests with size set to a few kb,
583                # to make sure they work. We still want to avoid using
584                # too much memory, though, but we do that noisily.
585                maxsize = 5147
586                self.failIf(maxsize * memuse + overhead > 20 * _1M)
587            else:
588                maxsize = int((max_memuse - overhead) / memuse)
589                if maxsize < minsize:
590                    # Really ought to print 'test skipped' or something
591                    if verbose:
592                        sys.stderr.write("Skipping %s because of memory "
593                                         "constraint\n" % (f.__name__,))
594                    return
595                # Try to keep some breathing room in memory use
596                maxsize = max(maxsize - 50 * _1M, minsize)
597            return f(self, maxsize)
598        wrapper.minsize = minsize
599        wrapper.memuse = memuse
600        wrapper.overhead = overhead
601        return wrapper
602    return decorator
603
604def bigaddrspacetest(f):
605    """Decorator for tests that fill the address space."""
606    def wrapper(self):
607        if max_memuse < MAX_Py_ssize_t:
608            if verbose:
609                sys.stderr.write("Skipping %s because of memory "
610                                 "constraint\n" % (f.__name__,))
611        else:
612            return f(self)
613    return wrapper
614
615#=======================================================================
616# unittest integration.
617
618class BasicTestRunner:
619    def run(self, test):
620        result = unittest.TestResult()
621        test(result)
622        return result
623
624
625def _run_suite(suite):
626    """Run tests from a unittest.TestSuite-derived class."""
627    if verbose:
628        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
629    else:
630        runner = BasicTestRunner()
631
632    result = runner.run(suite)
633    if not result.wasSuccessful():
634        if len(result.errors) == 1 and not result.failures:
635            err = result.errors[0][1]
636        elif len(result.failures) == 1 and not result.errors:
637            err = result.failures[0][1]
638        else:
639            err = "errors occurred; run in verbose mode for details"
640        raise TestFailed(err)
641
642
643def run_unittest(*classes):
644    """Run tests from unittest.TestCase-derived classes."""
645    valid_types = (unittest.TestSuite, unittest.TestCase)
646    suite = unittest.TestSuite()
647    for cls in classes:
648        if isinstance(cls, str):
649            if cls in sys.modules:
650                suite.addTest(unittest.findTestCases(sys.modules[cls]))
651            else:
652                raise ValueError("str arguments must be keys in sys.modules")
653        elif isinstance(cls, valid_types):
654            suite.addTest(cls)
655        else:
656            suite.addTest(unittest.makeSuite(cls))
657    _run_suite(suite)
658
659
660#=======================================================================
661# doctest driver.
662
663def run_doctest(module, verbosity=None):
664    """Run doctest on the given module.  Return (#failures, #tests).
665
666    If optional argument verbosity is not specified (or is None), pass
667    test_support's belief about verbosity on to doctest.  Else doctest's
668    usual behavior is used (it searches sys.argv for -v).
669    """
670
671    import doctest
672
673    if verbosity is None:
674        verbosity = verbose
675    else:
676        verbosity = None
677
678    # Direct doctest output (normally just errors) to real stdout; doctest
679    # output shouldn't be compared by regrtest.
680    save_stdout = sys.stdout
681    sys.stdout = get_original_stdout()
682    try:
683        f, t = doctest.testmod(module, verbose=verbosity)
684        if f:
685            raise TestFailed("%d of %d doctests failed" % (f, t))
686    finally:
687        sys.stdout = save_stdout
688    if verbose:
689        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
690    return f, t
691
692#=======================================================================
693# Threading support to prevent reporting refleaks when running regrtest.py -R
694
695def threading_setup():
696    import threading
697    return len(threading._active), len(threading._limbo)
698
699def threading_cleanup(num_active, num_limbo):
700    import threading
701    import time
702
703    _MAX_COUNT = 10
704    count = 0
705    while len(threading._active) != num_active and count < _MAX_COUNT:
706        count += 1
707        time.sleep(0.1)
708
709    count = 0
710    while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
711        count += 1
712        time.sleep(0.1)
713
714def reap_children():
715    """Use this function at the end of test_main() whenever sub-processes
716    are started.  This will help ensure that no extra children (zombies)
717    stick around to hog resources and create problems when looking
718    for refleaks.
719    """
720
721    # Reap all our dead child processes so we don't leave zombies around.
722    # These hog resources and might be causing some of the buildbots to die.
723    if hasattr(os, 'waitpid'):
724        any_process = -1
725        while True:
726            try:
727                # This will raise an exception on Windows.  That's ok.
728                pid, status = os.waitpid(any_process, os.WNOHANG)
729                if pid == 0:
730                    break
731            except:
732                break
733