1# tests __main__ module handling in multiprocessing 2from test import support 3# Skip tests if _thread or _multiprocessing wasn't built. 4support.import_module('_thread') 5support.import_module('_multiprocessing') 6 7import importlib 8import importlib.machinery 9import unittest 10import sys 11import os 12import os.path 13import py_compile 14 15from test.support.script_helper import ( 16 make_pkg, make_script, make_zip_pkg, make_zip_script, 17 assert_python_ok) 18 19if support.PGO: 20 raise unittest.SkipTest("test is not helpful for PGO") 21 22# Look up which start methods are available to test 23import multiprocessing 24AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods()) 25 26# Issue #22332: Skip tests if sem_open implementation is broken. 27support.import_module('multiprocessing.synchronize') 28 29verbose = support.verbose 30 31test_source = """\ 32# multiprocessing includes all sorts of shenanigans to make __main__ 33# attributes accessible in the subprocess in a pickle compatible way. 34 35# We run the "doesn't work in the interactive interpreter" example from 36# the docs to make sure it *does* work from an executed __main__, 37# regardless of the invocation mechanism 38 39import sys 40import time 41from multiprocessing import Pool, set_start_method 42 43# We use this __main__ defined function in the map call below in order to 44# check that multiprocessing in correctly running the unguarded 45# code in child processes and then making it available as __main__ 46def f(x): 47 return x*x 48 49# Check explicit relative imports 50if "check_sibling" in __file__: 51 # We're inside a package and not in a __main__.py file 52 # so make sure explicit relative imports work correctly 53 from . import sibling 54 55if __name__ == '__main__': 56 start_method = sys.argv[1] 57 set_start_method(start_method) 58 p = Pool(5) 59 results = [] 60 p.map_async(f, [1, 2, 3], callback=results.extend) 61 deadline = time.time() + 10 # up to 10 s to report the results 62 while not results: 63 time.sleep(0.05) 64 if time.time() > deadline: 65 raise RuntimeError("Timed out waiting for results") 66 results.sort() 67 print(start_method, "->", results) 68""" 69 70test_source_main_skipped_in_children = """\ 71# __main__.py files have an implied "if __name__ == '__main__'" so 72# multiprocessing should always skip running them in child processes 73 74# This means we can't use __main__ defined functions in child processes, 75# so we just use "int" as a passthrough operation below 76 77if __name__ != "__main__": 78 raise RuntimeError("Should only be called as __main__!") 79 80import sys 81import time 82from multiprocessing import Pool, set_start_method 83 84start_method = sys.argv[1] 85set_start_method(start_method) 86p = Pool(5) 87results = [] 88p.map_async(int, [1, 4, 9], callback=results.extend) 89deadline = time.time() + 10 # up to 10 s to report the results 90while not results: 91 time.sleep(0.05) 92 if time.time() > deadline: 93 raise RuntimeError("Timed out waiting for results") 94results.sort() 95print(start_method, "->", results) 96""" 97 98# These helpers were copied from test_cmd_line_script & tweaked a bit... 99 100def _make_test_script(script_dir, script_basename, 101 source=test_source, omit_suffix=False): 102 to_return = make_script(script_dir, script_basename, 103 source, omit_suffix) 104 # Hack to check explicit relative imports 105 if script_basename == "check_sibling": 106 make_script(script_dir, "sibling", "") 107 importlib.invalidate_caches() 108 return to_return 109 110def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, 111 source=test_source, depth=1): 112 to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, 113 source, depth) 114 importlib.invalidate_caches() 115 return to_return 116 117# There's no easy way to pass the script directory in to get 118# -m to work (avoiding that is the whole point of making 119# directories and zipfiles executable!) 120# So we fake it for testing purposes with a custom launch script 121launch_source = """\ 122import sys, os.path, runpy 123sys.path.insert(0, %s) 124runpy._run_module_as_main(%r) 125""" 126 127def _make_launch_script(script_dir, script_basename, module_name, path=None): 128 if path is None: 129 path = "os.path.dirname(__file__)" 130 else: 131 path = repr(path) 132 source = launch_source % (path, module_name) 133 to_return = make_script(script_dir, script_basename, source) 134 importlib.invalidate_caches() 135 return to_return 136 137class MultiProcessingCmdLineMixin(): 138 maxDiff = None # Show full tracebacks on subprocess failure 139 140 def setUp(self): 141 if self.start_method not in AVAILABLE_START_METHODS: 142 self.skipTest("%r start method not available" % self.start_method) 143 144 def _check_output(self, script_name, exit_code, out, err): 145 if verbose > 1: 146 print("Output from test script %r:" % script_name) 147 print(repr(out)) 148 self.assertEqual(exit_code, 0) 149 self.assertEqual(err.decode('utf-8'), '') 150 expected_results = "%s -> [1, 4, 9]" % self.start_method 151 self.assertEqual(out.decode('utf-8').strip(), expected_results) 152 153 def _check_script(self, script_name, *cmd_line_switches): 154 if not __debug__: 155 cmd_line_switches += ('-' + 'O' * sys.flags.optimize,) 156 run_args = cmd_line_switches + (script_name, self.start_method) 157 rc, out, err = assert_python_ok(*run_args, __isolated=False) 158 self._check_output(script_name, rc, out, err) 159 160 def test_basic_script(self): 161 with support.temp_dir() as script_dir: 162 script_name = _make_test_script(script_dir, 'script') 163 self._check_script(script_name) 164 165 def test_basic_script_no_suffix(self): 166 with support.temp_dir() as script_dir: 167 script_name = _make_test_script(script_dir, 'script', 168 omit_suffix=True) 169 self._check_script(script_name) 170 171 def test_ipython_workaround(self): 172 # Some versions of the IPython launch script are missing the 173 # __name__ = "__main__" guard, and multiprocessing has long had 174 # a workaround for that case 175 # See https://github.com/ipython/ipython/issues/4698 176 source = test_source_main_skipped_in_children 177 with support.temp_dir() as script_dir: 178 script_name = _make_test_script(script_dir, 'ipython', 179 source=source) 180 self._check_script(script_name) 181 script_no_suffix = _make_test_script(script_dir, 'ipython', 182 source=source, 183 omit_suffix=True) 184 self._check_script(script_no_suffix) 185 186 def test_script_compiled(self): 187 with support.temp_dir() as script_dir: 188 script_name = _make_test_script(script_dir, 'script') 189 py_compile.compile(script_name, doraise=True) 190 os.remove(script_name) 191 pyc_file = support.make_legacy_pyc(script_name) 192 self._check_script(pyc_file) 193 194 def test_directory(self): 195 source = self.main_in_children_source 196 with support.temp_dir() as script_dir: 197 script_name = _make_test_script(script_dir, '__main__', 198 source=source) 199 self._check_script(script_dir) 200 201 def test_directory_compiled(self): 202 source = self.main_in_children_source 203 with support.temp_dir() as script_dir: 204 script_name = _make_test_script(script_dir, '__main__', 205 source=source) 206 py_compile.compile(script_name, doraise=True) 207 os.remove(script_name) 208 pyc_file = support.make_legacy_pyc(script_name) 209 self._check_script(script_dir) 210 211 def test_zipfile(self): 212 source = self.main_in_children_source 213 with support.temp_dir() as script_dir: 214 script_name = _make_test_script(script_dir, '__main__', 215 source=source) 216 zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name) 217 self._check_script(zip_name) 218 219 def test_zipfile_compiled(self): 220 source = self.main_in_children_source 221 with support.temp_dir() as script_dir: 222 script_name = _make_test_script(script_dir, '__main__', 223 source=source) 224 compiled_name = py_compile.compile(script_name, doraise=True) 225 zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name) 226 self._check_script(zip_name) 227 228 def test_module_in_package(self): 229 with support.temp_dir() as script_dir: 230 pkg_dir = os.path.join(script_dir, 'test_pkg') 231 make_pkg(pkg_dir) 232 script_name = _make_test_script(pkg_dir, 'check_sibling') 233 launch_name = _make_launch_script(script_dir, 'launch', 234 'test_pkg.check_sibling') 235 self._check_script(launch_name) 236 237 def test_module_in_package_in_zipfile(self): 238 with support.temp_dir() as script_dir: 239 zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script') 240 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name) 241 self._check_script(launch_name) 242 243 def test_module_in_subpackage_in_zipfile(self): 244 with support.temp_dir() as script_dir: 245 zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2) 246 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name) 247 self._check_script(launch_name) 248 249 def test_package(self): 250 source = self.main_in_children_source 251 with support.temp_dir() as script_dir: 252 pkg_dir = os.path.join(script_dir, 'test_pkg') 253 make_pkg(pkg_dir) 254 script_name = _make_test_script(pkg_dir, '__main__', 255 source=source) 256 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg') 257 self._check_script(launch_name) 258 259 def test_package_compiled(self): 260 source = self.main_in_children_source 261 with support.temp_dir() as script_dir: 262 pkg_dir = os.path.join(script_dir, 'test_pkg') 263 make_pkg(pkg_dir) 264 script_name = _make_test_script(pkg_dir, '__main__', 265 source=source) 266 compiled_name = py_compile.compile(script_name, doraise=True) 267 os.remove(script_name) 268 pyc_file = support.make_legacy_pyc(script_name) 269 launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg') 270 self._check_script(launch_name) 271 272# Test all supported start methods (setupClass skips as appropriate) 273 274class SpawnCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase): 275 start_method = 'spawn' 276 main_in_children_source = test_source_main_skipped_in_children 277 278class ForkCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase): 279 start_method = 'fork' 280 main_in_children_source = test_source 281 282class ForkServerCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase): 283 start_method = 'forkserver' 284 main_in_children_source = test_source_main_skipped_in_children 285 286def tearDownModule(): 287 support.reap_children() 288 289if __name__ == '__main__': 290 unittest.main() 291