test_support.py revision 6057b2e645dd3b7a262553d5a547ef45d3fc5d93
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4    raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import socket
9import sys
10import os
11import os.path
12import shutil
13import warnings
14import unittest
15
16class Error(Exception):
17    """Base class for regression test exceptions."""
18
19class TestFailed(Error):
20    """Test failed."""
21
22class TestSkipped(Error):
23    """Test skipped.
24
25    This can be raised to indicate that a test was deliberatly
26    skipped, but not because a feature wasn't available.  For
27    example, if some resource can't be used, such as the network
28    appears to be unavailable, this should be raised instead of
29    TestFailed.
30    """
31
32class ResourceDenied(TestSkipped):
33    """Test skipped because it requested a disallowed resource.
34
35    This is raised when a test calls requires() for a resource that
36    has not be enabled.  It is used to distinguish between expected
37    and unexpected skips.
38    """
39
40verbose = 1              # Flag set to 0 by regrtest.py
41use_resources = None     # Flag set to [] by regrtest.py
42max_memuse = 0           # Disable bigmem tests (they will still be run with
43                         # small sizes, to make sure they work.)
44
45# _original_stdout is meant to hold stdout at the time regrtest began.
46# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
47# The point is to have some flavor of stdout the user can actually see.
48_original_stdout = None
49def record_original_stdout(stdout):
50    global _original_stdout
51    _original_stdout = stdout
52
53def get_original_stdout():
54    return _original_stdout or sys.stdout
55
56def unload(name):
57    try:
58        del sys.modules[name]
59    except KeyError:
60        pass
61
62def unlink(filename):
63    try:
64        os.unlink(filename)
65    except OSError:
66        pass
67
68def rmtree(path):
69    try:
70        shutil.rmtree(path)
71    except OSError, e:
72        # Unix returns ENOENT, Windows returns ESRCH.
73        if e.errno not in (errno.ENOENT, errno.ESRCH):
74            raise
75
76def forget(modname):
77    '''"Forget" a module was ever imported by removing it from sys.modules and
78    deleting any .pyc and .pyo files.'''
79    unload(modname)
80    for dirname in sys.path:
81        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
82        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
83        # the chance exists that there is no .pyc (and thus the 'try' statement
84        # is exited) but there is a .pyo file.
85        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
86
87def is_resource_enabled(resource):
88    """Test whether a resource is enabled.  Known resources are set by
89    regrtest.py."""
90    return use_resources is not None and resource in use_resources
91
92def requires(resource, msg=None):
93    """Raise ResourceDenied if the specified resource is not available.
94
95    If the caller's module is __main__ then automatically return True.  The
96    possibility of False being returned occurs when regrtest.py is executing."""
97    # see if the caller's module is __main__ - if so, treat as if
98    # the resource was set
99    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
100        return
101    if not is_resource_enabled(resource):
102        if msg is None:
103            msg = "Use of the `%s' resource not enabled" % resource
104        raise ResourceDenied(msg)
105
106def bind_port(sock, host='', preferred_port=54321):
107    """Try to bind the sock to a port.  If we are running multiple
108    tests and we don't try multiple ports, the test can fails.  This
109    makes the test more robust."""
110
111    # Find some random ports that hopefully no one is listening on.
112    # Ideally each test would clean up after itself and not continue listening
113    # on any ports.  However, this isn't the case.  The last port (0) is
114    # a stop-gap that asks the O/S to assign a port.  Whenever the warning
115    # message below is printed, the test that is listening on the port should
116    # be fixed to close the socket at the end of the test.
117    # Another reason why we can't use a port is another process (possibly
118    # another instance of the test suite) is using the same port.
119    for port in [preferred_port, 9907, 10243, 32999, 0]:
120        try:
121            sock.bind((host, port))
122            if port == 0:
123                port = sock.getsockname()[1]
124            return port
125        except socket.error, (err, msg):
126            if err != errno.EADDRINUSE:
127                raise
128            print >>sys.__stderr__, \
129                '  WARNING: failed to listen on port %d, trying another' % port
130    raise TestFailed('unable to find port to listen on')
131
132FUZZ = 1e-6
133
134def fcmp(x, y): # fuzzy comparison function
135    if type(x) == type(0.0) or type(y) == type(0.0):
136        try:
137            x, y = coerce(x, y)
138            fuzz = (abs(x) + abs(y)) * FUZZ
139            if abs(x-y) <= fuzz:
140                return 0
141        except:
142            pass
143    elif type(x) == type(y) and type(x) in (type(()), type([])):
144        for i in range(min(len(x), len(y))):
145            outcome = fcmp(x[i], y[i])
146            if outcome != 0:
147                return outcome
148        return cmp(len(x), len(y))
149    return cmp(x, y)
150
151try:
152    unicode
153    have_unicode = True
154except NameError:
155    have_unicode = False
156
157is_jython = sys.platform.startswith('java')
158
159# Filename used for testing
160if os.name == 'java':
161    # Jython disallows @ in module names
162    TESTFN = '$test'
163elif os.name == 'riscos':
164    TESTFN = 'testfile'
165else:
166    TESTFN = '@test'
167    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
168    if have_unicode:
169        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
170        # TESTFN_UNICODE is a filename that can be encoded using the
171        # file system encoding, but *not* with the default (ascii) encoding
172        if isinstance('', unicode):
173            # python -U
174            # XXX perhaps unicode() should accept Unicode strings?
175            TESTFN_UNICODE = "@test-\xe0\xf2"
176        else:
177            # 2 latin characters.
178            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
179        TESTFN_ENCODING = sys.getfilesystemencoding()
180        # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
181        # able to be encoded by *either* the default or filesystem encoding.
182        # This test really only makes sense on Windows NT platforms
183        # which have special Unicode support in posixmodule.
184        if (not hasattr(sys, "getwindowsversion") or
185                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
186            TESTFN_UNICODE_UNENCODEABLE = None
187        else:
188            # Japanese characters (I think - from bug 846133)
189            TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
190            try:
191                # XXX - Note - should be using TESTFN_ENCODING here - but for
192                # Windows, "mbcs" currently always operates as if in
193                # errors=ignore' mode - hence we get '?' characters rather than
194                # the exception.  'Latin1' operates as we expect - ie, fails.
195                # See [ 850997 ] mbcs encoding ignores errors
196                TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
197            except UnicodeEncodeError:
198                pass
199            else:
200                print \
201                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
202                'Unicode filename tests may not be effective' \
203                % TESTFN_UNICODE_UNENCODEABLE
204
205# Make sure we can write to TESTFN, try in /tmp if we can't
206fp = None
207try:
208    fp = open(TESTFN, 'w+')
209except IOError:
210    TMP_TESTFN = os.path.join('/tmp', TESTFN)
211    try:
212        fp = open(TMP_TESTFN, 'w+')
213        TESTFN = TMP_TESTFN
214        del TMP_TESTFN
215    except IOError:
216        print ('WARNING: tests will fail, unable to write to: %s or %s' %
217                (TESTFN, TMP_TESTFN))
218if fp is not None:
219    fp.close()
220    unlink(TESTFN)
221del fp
222
223def findfile(file, here=__file__):
224    """Try to find a file on sys.path and the working directory.  If it is not
225    found the argument passed to the function is returned (this does not
226    necessarily signal failure; could still be the legitimate path)."""
227    if os.path.isabs(file):
228        return file
229    path = sys.path
230    path = [os.path.dirname(here)] + path
231    for dn in path:
232        fn = os.path.join(dn, file)
233        if os.path.exists(fn): return fn
234    return file
235
236def verify(condition, reason='test failed'):
237    """Verify that condition is true. If not, raise TestFailed.
238
239       The optional argument reason can be given to provide
240       a better error text.
241    """
242
243    if not condition:
244        raise TestFailed(reason)
245
246def vereq(a, b):
247    """Raise TestFailed if a == b is false.
248
249    This is better than verify(a == b) because, in case of failure, the
250    error message incorporates repr(a) and repr(b) so you can see the
251    inputs.
252
253    Note that "not (a == b)" isn't necessarily the same as "a != b"; the
254    former is tested.
255    """
256
257    if not (a == b):
258        raise TestFailed("%r == %r" % (a, b))
259
260def sortdict(dict):
261    "Like repr(dict), but in sorted order."
262    items = dict.items()
263    items.sort()
264    reprpairs = ["%r: %r" % pair for pair in items]
265    withcommas = ", ".join(reprpairs)
266    return "{%s}" % withcommas
267
268def check_syntax_error(testcase, statement):
269    try:
270        compile(statement, '<test string>', 'exec')
271    except SyntaxError:
272        pass
273    else:
274        testcase.fail('Missing SyntaxError: "%s"' % statement)
275
276def open_urlresource(url):
277    import urllib, urlparse
278
279    requires('urlfetch')
280    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
281
282    for path in [os.path.curdir, os.path.pardir]:
283        fn = os.path.join(path, filename)
284        if os.path.exists(fn):
285            return open(fn)
286
287    print >> get_original_stdout(), '\tfetching %s ...' % url
288    fn, _ = urllib.urlretrieve(url, filename)
289    return open(fn)
290
291
292class WarningMessage(object):
293    "Holds the result of the latest showwarning() call"
294    def __init__(self):
295        self.message = None
296        self.category = None
297        self.filename = None
298        self.lineno = None
299
300    def _showwarning(self, message, category, filename, lineno, file=None):
301        self.message = message
302        self.category = category
303        self.filename = filename
304        self.lineno = lineno
305
306@contextlib.contextmanager
307def catch_warning():
308    """
309    Guard the warnings filter from being permanently changed and record the
310    data of the last warning that has been issued.
311
312    Use like this:
313
314        with catch_warning() as w:
315            warnings.warn("foo")
316            assert str(w.message) == "foo"
317    """
318    warning = WarningMessage()
319    original_filters = warnings.filters[:]
320    original_showwarning = warnings.showwarning
321    warnings.showwarning = warning._showwarning
322    try:
323        yield warning
324    finally:
325        warnings.showwarning = original_showwarning
326        warnings.filters = original_filters
327
328class EnvironmentVarGuard(object):
329
330    """Class to help protect the environment variable properly.  Can be used as
331    a context manager."""
332
333    def __init__(self):
334        self._environ = os.environ
335        self._unset = set()
336        self._reset = dict()
337
338    def set(self, envvar, value):
339        if envvar not in self._environ:
340            self._unset.add(envvar)
341        else:
342            self._reset[envvar] = self._environ[envvar]
343        self._environ[envvar] = value
344
345    def unset(self, envvar):
346        if envvar in self._environ:
347            self._reset[envvar] = self._environ[envvar]
348            del self._environ[envvar]
349
350    def __enter__(self):
351        return self
352
353    def __exit__(self, *ignore_exc):
354        for envvar, value in self._reset.iteritems():
355            self._environ[envvar] = value
356        for unset in self._unset:
357            del self._environ[unset]
358
359class TransientResource(object):
360
361    """Raise ResourceDenied if an exception is raised while the context manager
362    is in effect that matches the specified exception and attributes."""
363
364    def __init__(self, exc, **kwargs):
365        self.exc = exc
366        self.attrs = kwargs
367
368    def __enter__(self):
369        return self
370
371    def __exit__(self, type_=None, value=None, traceback=None):
372        """If type_ is a subclass of self.exc and value has attributes matching
373        self.attrs, raise ResourceDenied.  Otherwise let the exception
374        propagate (if any)."""
375        if type_ is not None and issubclass(self.exc, type_):
376            for attr, attr_value in self.attrs.iteritems():
377                if not hasattr(value, attr):
378                    break
379                if getattr(value, attr) != attr_value:
380                    break
381            else:
382                raise ResourceDenied("an optional resource is not available")
383
384
385def transient_internet():
386    """Return a context manager that raises ResourceDenied when various issues
387    with the Internet connection manifest themselves as exceptions."""
388    time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
389    socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
390    ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
391    return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
392
393
394@contextlib.contextmanager
395def captured_stdout():
396    """Run the with statement body using a StringIO object as sys.stdout.
397    Example use::
398
399       with captured_stdout() as s:
400           print "hello"
401       assert s.getvalue() == "hello"
402    """
403    import StringIO
404    orig_stdout = sys.stdout
405    sys.stdout = StringIO.StringIO()
406    yield sys.stdout
407    sys.stdout = orig_stdout
408
409
410#=======================================================================
411# Decorator for running a function in a different locale, correctly resetting
412# it afterwards.
413
414def run_with_locale(catstr, *locales):
415    def decorator(func):
416        def inner(*args, **kwds):
417            try:
418                import locale
419                category = getattr(locale, catstr)
420                orig_locale = locale.setlocale(category)
421            except AttributeError:
422                # if the test author gives us an invalid category string
423                raise
424            except:
425                # cannot retrieve original locale, so do nothing
426                locale = orig_locale = None
427            else:
428                for loc in locales:
429                    try:
430                        locale.setlocale(category, loc)
431                        break
432                    except:
433                        pass
434
435            # now run the function, resetting the locale on exceptions
436            try:
437                return func(*args, **kwds)
438            finally:
439                if locale and orig_locale:
440                    locale.setlocale(category, orig_locale)
441        inner.func_name = func.func_name
442        inner.__doc__ = func.__doc__
443        return inner
444    return decorator
445
446#=======================================================================
447# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
448
449# Some handy shorthands. Note that these are used for byte-limits as well
450# as size-limits, in the various bigmem tests
451_1M = 1024*1024
452_1G = 1024 * _1M
453_2G = 2 * _1G
454
455# Hack to get at the maximum value an internal index can take.
456class _Dummy:
457    def __getslice__(self, i, j):
458        return j
459MAX_Py_ssize_t = _Dummy()[:]
460
461def set_memlimit(limit):
462    import re
463    global max_memuse
464    sizes = {
465        'k': 1024,
466        'm': _1M,
467        'g': _1G,
468        't': 1024*_1G,
469    }
470    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
471                 re.IGNORECASE | re.VERBOSE)
472    if m is None:
473        raise ValueError('Invalid memory limit %r' % (limit,))
474    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
475    if memlimit > MAX_Py_ssize_t:
476        memlimit = MAX_Py_ssize_t
477    if memlimit < _2G - 1:
478        raise ValueError('Memory limit %r too low to be useful' % (limit,))
479    max_memuse = memlimit
480
481def bigmemtest(minsize, memuse, overhead=5*_1M):
482    """Decorator for bigmem tests.
483
484    'minsize' is the minimum useful size for the test (in arbitrary,
485    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
486    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
487    independant of the testsize, and defaults to 5Mb.
488
489    The decorator tries to guess a good value for 'size' and passes it to
490    the decorated test function. If minsize * memuse is more than the
491    allowed memory use (as defined by max_memuse), the test is skipped.
492    Otherwise, minsize is adjusted upward to use up to max_memuse.
493    """
494    def decorator(f):
495        def wrapper(self):
496            if not max_memuse:
497                # If max_memuse is 0 (the default),
498                # we still want to run the tests with size set to a few kb,
499                # to make sure they work. We still want to avoid using
500                # too much memory, though, but we do that noisily.
501                maxsize = 5147
502                self.failIf(maxsize * memuse + overhead > 20 * _1M)
503            else:
504                maxsize = int((max_memuse - overhead) / memuse)
505                if maxsize < minsize:
506                    # Really ought to print 'test skipped' or something
507                    if verbose:
508                        sys.stderr.write("Skipping %s because of memory "
509                                         "constraint\n" % (f.__name__,))
510                    return
511                # Try to keep some breathing room in memory use
512                maxsize = max(maxsize - 50 * _1M, minsize)
513            return f(self, maxsize)
514        wrapper.minsize = minsize
515        wrapper.memuse = memuse
516        wrapper.overhead = overhead
517        return wrapper
518    return decorator
519
520def bigaddrspacetest(f):
521    """Decorator for tests that fill the address space."""
522    def wrapper(self):
523        if max_memuse < MAX_Py_ssize_t:
524            if verbose:
525                sys.stderr.write("Skipping %s because of memory "
526                                 "constraint\n" % (f.__name__,))
527        else:
528            return f(self)
529    return wrapper
530
531#=======================================================================
532# unittest integration.
533
534class BasicTestRunner:
535    def run(self, test):
536        result = unittest.TestResult()
537        test(result)
538        return result
539
540
541def _run_suite(suite):
542    """Run tests from a unittest.TestSuite-derived class."""
543    if verbose:
544        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
545    else:
546        runner = BasicTestRunner()
547
548    result = runner.run(suite)
549    if not result.wasSuccessful():
550        if len(result.errors) == 1 and not result.failures:
551            err = result.errors[0][1]
552        elif len(result.failures) == 1 and not result.errors:
553            err = result.failures[0][1]
554        else:
555            err = "errors occurred; run in verbose mode for details"
556        raise TestFailed(err)
557
558
559def run_unittest(*classes):
560    """Run tests from unittest.TestCase-derived classes."""
561    valid_types = (unittest.TestSuite, unittest.TestCase)
562    suite = unittest.TestSuite()
563    for cls in classes:
564        if isinstance(cls, str):
565            if cls in sys.modules:
566                suite.addTest(unittest.findTestCases(sys.modules[cls]))
567            else:
568                raise ValueError("str arguments must be keys in sys.modules")
569        elif isinstance(cls, valid_types):
570            suite.addTest(cls)
571        else:
572            suite.addTest(unittest.makeSuite(cls))
573    _run_suite(suite)
574
575
576#=======================================================================
577# doctest driver.
578
579def run_doctest(module, verbosity=None):
580    """Run doctest on the given module.  Return (#failures, #tests).
581
582    If optional argument verbosity is not specified (or is None), pass
583    test_support's belief about verbosity on to doctest.  Else doctest's
584    usual behavior is used (it searches sys.argv for -v).
585    """
586
587    import doctest
588
589    if verbosity is None:
590        verbosity = verbose
591    else:
592        verbosity = None
593
594    # Direct doctest output (normally just errors) to real stdout; doctest
595    # output shouldn't be compared by regrtest.
596    save_stdout = sys.stdout
597    sys.stdout = get_original_stdout()
598    try:
599        f, t = doctest.testmod(module, verbose=verbosity)
600        if f:
601            raise TestFailed("%d of %d doctests failed" % (f, t))
602    finally:
603        sys.stdout = save_stdout
604    if verbose:
605        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
606    return f, t
607
608#=======================================================================
609# Threading support to prevent reporting refleaks when running regrtest.py -R
610
611def threading_setup():
612    import threading
613    return len(threading._active), len(threading._limbo)
614
615def threading_cleanup(num_active, num_limbo):
616    import threading
617    import time
618
619    _MAX_COUNT = 10
620    count = 0
621    while len(threading._active) != num_active and count < _MAX_COUNT:
622        count += 1
623        time.sleep(0.1)
624
625    count = 0
626    while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
627        count += 1
628        time.sleep(0.1)
629
630def reap_children():
631    """Use this function at the end of test_main() whenever sub-processes
632    are started.  This will help ensure that no extra children (zombies)
633    stick around to hog resources and create problems when looking
634    for refleaks.
635    """
636
637    # Reap all our dead child processes so we don't leave zombies around.
638    # These hog resources and might be causing some of the buildbots to die.
639    if hasattr(os, 'waitpid'):
640        any_process = -1
641        while True:
642            try:
643                # This will raise an exception on Windows.  That's ok.
644                pid, status = os.waitpid(any_process, os.WNOHANG)
645                if pid == 0:
646                    break
647            except:
648                break
649