1# Common utility functions used by various script execution tests
2#  e.g. test_cmd_line, test_cmd_line_script and test_runpy
3
4import collections
5import importlib
6import sys
7import os
8import os.path
9import tempfile
10import subprocess
11import py_compile
12import contextlib
13import shutil
14import zipfile
15
16from importlib.util import source_from_cache
17from test.support import make_legacy_pyc, strip_python_stderr
18
19
20# Cached result of the expensive test performed in the function below.
21__cached_interp_requires_environment = None
22
23def interpreter_requires_environment():
24    """
25    Returns True if our sys.executable interpreter requires environment
26    variables in order to be able to run at all.
27
28    This is designed to be used with @unittest.skipIf() to annotate tests
29    that need to use an assert_python*() function to launch an isolated
30    mode (-I) or no environment mode (-E) sub-interpreter process.
31
32    A normal build & test does not run into this situation but it can happen
33    when trying to run the standard library test suite from an interpreter that
34    doesn't have an obvious home with Python's current home finding logic.
35
36    Setting PYTHONHOME is one way to get most of the testsuite to run in that
37    situation.  PYTHONPATH or PYTHONUSERSITE are other common environment
38    variables that might impact whether or not the interpreter can start.
39    """
40    global __cached_interp_requires_environment
41    if __cached_interp_requires_environment is None:
42        # Try running an interpreter with -E to see if it works or not.
43        try:
44            subprocess.check_call([sys.executable, '-E',
45                                   '-c', 'import sys; sys.exit(0)'])
46        except subprocess.CalledProcessError:
47            __cached_interp_requires_environment = True
48        else:
49            __cached_interp_requires_environment = False
50
51    return __cached_interp_requires_environment
52
53
54_PythonRunResult = collections.namedtuple("_PythonRunResult",
55                                          ("rc", "out", "err"))
56
57
58# Executing the interpreter in a subprocess
59def run_python_until_end(*args, **env_vars):
60    env_required = interpreter_requires_environment()
61    if '__isolated' in env_vars:
62        isolated = env_vars.pop('__isolated')
63    else:
64        isolated = not env_vars and not env_required
65    cmd_line = [sys.executable, '-X', 'faulthandler']
66    if isolated:
67        # isolated mode: ignore Python environment variables, ignore user
68        # site-packages, and don't add the current directory to sys.path
69        cmd_line.append('-I')
70    elif not env_vars and not env_required:
71        # ignore Python environment variables
72        cmd_line.append('-E')
73
74    # But a special flag that can be set to override -- in this case, the
75    # caller is responsible to pass the full environment.
76    if env_vars.pop('__cleanenv', None):
77        env = {}
78        if sys.platform == 'win32':
79            # Windows requires at least the SYSTEMROOT environment variable to
80            # start Python.
81            env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
82
83        # Other interesting environment variables, not copied currently:
84        # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP.
85    else:
86        # Need to preserve the original environment, for in-place testing of
87        # shared library builds.
88        env = os.environ.copy()
89
90    # set TERM='' unless the TERM environment variable is passed explicitly
91    # see issues #11390 and #18300
92    if 'TERM' not in env_vars:
93        env['TERM'] = ''
94
95    env.update(env_vars)
96    cmd_line.extend(args)
97    proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
98                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
99                         env=env)
100    with proc:
101        try:
102            out, err = proc.communicate()
103        finally:
104            proc.kill()
105            subprocess._cleanup()
106    rc = proc.returncode
107    err = strip_python_stderr(err)
108    return _PythonRunResult(rc, out, err), cmd_line
109
110def _assert_python(expected_success, *args, **env_vars):
111    res, cmd_line = run_python_until_end(*args, **env_vars)
112    if (res.rc and expected_success) or (not res.rc and not expected_success):
113        # Limit to 80 lines to ASCII characters
114        maxlen = 80 * 100
115        out, err = res.out, res.err
116        if len(out) > maxlen:
117            out = b'(... truncated stdout ...)' + out[-maxlen:]
118        if len(err) > maxlen:
119            err = b'(... truncated stderr ...)' + err[-maxlen:]
120        out = out.decode('ascii', 'replace').rstrip()
121        err = err.decode('ascii', 'replace').rstrip()
122        raise AssertionError("Process return code is %d\n"
123                             "command line: %r\n"
124                             "\n"
125                             "stdout:\n"
126                             "---\n"
127                             "%s\n"
128                             "---\n"
129                             "\n"
130                             "stderr:\n"
131                             "---\n"
132                             "%s\n"
133                             "---"
134                             % (res.rc, cmd_line,
135                                out,
136                                err))
137    return res
138
139def assert_python_ok(*args, **env_vars):
140    """
141    Assert that running the interpreter with `args` and optional environment
142    variables `env_vars` succeeds (rc == 0) and return a (return code, stdout,
143    stderr) tuple.
144
145    If the __cleanenv keyword is set, env_vars is used as a fresh environment.
146
147    Python is started in isolated mode (command line option -I),
148    except if the __isolated keyword is set to False.
149    """
150    return _assert_python(True, *args, **env_vars)
151
152def assert_python_failure(*args, **env_vars):
153    """
154    Assert that running the interpreter with `args` and optional environment
155    variables `env_vars` fails (rc != 0) and return a (return code, stdout,
156    stderr) tuple.
157
158    See assert_python_ok() for more options.
159    """
160    return _assert_python(False, *args, **env_vars)
161
162def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
163    """Run a Python subprocess with the given arguments.
164
165    kw is extra keyword args to pass to subprocess.Popen. Returns a Popen
166    object.
167    """
168    cmd_line = [sys.executable, '-E']
169    cmd_line.extend(args)
170    # Under Fedora (?), GNU readline can output junk on stderr when initialized,
171    # depending on the TERM setting.  Setting TERM=vt100 is supposed to disable
172    # that.  References:
173    # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html
174    # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
175    # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html
176    env = kw.setdefault('env', dict(os.environ))
177    env['TERM'] = 'vt100'
178    return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
179                            stdout=stdout, stderr=stderr,
180                            **kw)
181
182def kill_python(p):
183    """Run the given Popen process until completion and return stdout."""
184    p.stdin.close()
185    data = p.stdout.read()
186    p.stdout.close()
187    # try to cleanup the child so we don't appear to leak when running
188    # with regrtest -R.
189    p.wait()
190    subprocess._cleanup()
191    return data
192
193def make_script(script_dir, script_basename, source, omit_suffix=False):
194    script_filename = script_basename
195    if not omit_suffix:
196        script_filename += os.extsep + 'py'
197    script_name = os.path.join(script_dir, script_filename)
198    # The script should be encoded to UTF-8, the default string encoding
199    script_file = open(script_name, 'w', encoding='utf-8')
200    script_file.write(source)
201    script_file.close()
202    importlib.invalidate_caches()
203    return script_name
204
205def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
206    zip_filename = zip_basename+os.extsep+'zip'
207    zip_name = os.path.join(zip_dir, zip_filename)
208    zip_file = zipfile.ZipFile(zip_name, 'w')
209    if name_in_zip is None:
210        parts = script_name.split(os.sep)
211        if len(parts) >= 2 and parts[-2] == '__pycache__':
212            legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
213            name_in_zip = os.path.basename(legacy_pyc)
214            script_name = legacy_pyc
215        else:
216            name_in_zip = os.path.basename(script_name)
217    zip_file.write(script_name, name_in_zip)
218    zip_file.close()
219    #if test.support.verbose:
220    #    zip_file = zipfile.ZipFile(zip_name, 'r')
221    #    print 'Contents of %r:' % zip_name
222    #    zip_file.printdir()
223    #    zip_file.close()
224    return zip_name, os.path.join(zip_name, name_in_zip)
225
226def make_pkg(pkg_dir, init_source=''):
227    os.mkdir(pkg_dir)
228    make_script(pkg_dir, '__init__', init_source)
229
230def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
231                 source, depth=1, compiled=False):
232    unlink = []
233    init_name = make_script(zip_dir, '__init__', '')
234    unlink.append(init_name)
235    init_basename = os.path.basename(init_name)
236    script_name = make_script(zip_dir, script_basename, source)
237    unlink.append(script_name)
238    if compiled:
239        init_name = py_compile.compile(init_name, doraise=True)
240        script_name = py_compile.compile(script_name, doraise=True)
241        unlink.extend((init_name, script_name))
242    pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
243    script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
244    zip_filename = zip_basename+os.extsep+'zip'
245    zip_name = os.path.join(zip_dir, zip_filename)
246    zip_file = zipfile.ZipFile(zip_name, 'w')
247    for name in pkg_names:
248        init_name_in_zip = os.path.join(name, init_basename)
249        zip_file.write(init_name, init_name_in_zip)
250    zip_file.write(script_name, script_name_in_zip)
251    zip_file.close()
252    for name in unlink:
253        os.unlink(name)
254    #if test.support.verbose:
255    #    zip_file = zipfile.ZipFile(zip_name, 'r')
256    #    print 'Contents of %r:' % zip_name
257    #    zip_file.printdir()
258    #    zip_file.close()
259    return zip_name, os.path.join(zip_name, script_name_in_zip)
260