test_subprocess.py revision 860593653b119f16791456398b1dadcdb71aa9aa
1import unittest 2from test import test_support 3import subprocess 4import sys 5import signal 6import os 7import errno 8import tempfile 9import time 10import re 11import sysconfig 12 13mswindows = (sys.platform == "win32") 14 15# 16# Depends on the following external programs: Python 17# 18 19if mswindows: 20 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 21 'os.O_BINARY);') 22else: 23 SETBINARY = '' 24 25 26try: 27 mkstemp = tempfile.mkstemp 28except AttributeError: 29 # tempfile.mkstemp is not available 30 def mkstemp(): 31 """Replacement for mkstemp, calling mktemp.""" 32 fname = tempfile.mktemp() 33 return os.open(fname, os.O_RDWR|os.O_CREAT), fname 34 35 36class BaseTestCase(unittest.TestCase): 37 def setUp(self): 38 # Try to minimize the number of children we have so this test 39 # doesn't crash on some buildbots (Alphas in particular). 40 test_support.reap_children() 41 42 def tearDown(self): 43 for inst in subprocess._active: 44 inst.wait() 45 subprocess._cleanup() 46 self.assertFalse(subprocess._active, "subprocess._active not empty") 47 48 def assertStderrEqual(self, stderr, expected, msg=None): 49 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 50 # shutdown time. That frustrates tests trying to check stderr produced 51 # from a spawned Python process. 52 actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) 53 self.assertEqual(actual, expected, msg) 54 55 56class ProcessTestCase(BaseTestCase): 57 58 def test_call_seq(self): 59 # call() function with sequence argument 60 rc = subprocess.call([sys.executable, "-c", 61 "import sys; sys.exit(47)"]) 62 self.assertEqual(rc, 47) 63 64 def test_check_call_zero(self): 65 # check_call() function with zero return code 66 rc = subprocess.check_call([sys.executable, "-c", 67 "import sys; sys.exit(0)"]) 68 self.assertEqual(rc, 0) 69 70 def test_check_call_nonzero(self): 71 # check_call() function with non-zero return code 72 with self.assertRaises(subprocess.CalledProcessError) as c: 73 subprocess.check_call([sys.executable, "-c", 74 "import sys; sys.exit(47)"]) 75 self.assertEqual(c.exception.returncode, 47) 76 77 def test_check_output(self): 78 # check_output() function with zero return code 79 output = subprocess.check_output( 80 [sys.executable, "-c", "print 'BDFL'"]) 81 self.assertIn('BDFL', output) 82 83 def test_check_output_nonzero(self): 84 # check_call() function with non-zero return code 85 with self.assertRaises(subprocess.CalledProcessError) as c: 86 subprocess.check_output( 87 [sys.executable, "-c", "import sys; sys.exit(5)"]) 88 self.assertEqual(c.exception.returncode, 5) 89 90 def test_check_output_stderr(self): 91 # check_output() function stderr redirected to stdout 92 output = subprocess.check_output( 93 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 94 stderr=subprocess.STDOUT) 95 self.assertIn('BDFL', output) 96 97 def test_check_output_stdout_arg(self): 98 # check_output() function stderr redirected to stdout 99 with self.assertRaises(ValueError) as c: 100 output = subprocess.check_output( 101 [sys.executable, "-c", "print 'will not be run'"], 102 stdout=sys.stdout) 103 self.fail("Expected ValueError when stdout arg supplied.") 104 self.assertIn('stdout', c.exception.args[0]) 105 106 def test_call_kwargs(self): 107 # call() function with keyword args 108 newenv = os.environ.copy() 109 newenv["FRUIT"] = "banana" 110 rc = subprocess.call([sys.executable, "-c", 111 'import sys, os;' 112 'sys.exit(os.getenv("FRUIT")=="banana")'], 113 env=newenv) 114 self.assertEqual(rc, 1) 115 116 def test_invalid_args(self): 117 # Popen() called with invalid arguments should raise TypeError 118 # but Popen.__del__ should not complain (issue #12085) 119 with test_support.captured_stderr() as s: 120 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 121 argcount = subprocess.Popen.__init__.__code__.co_argcount 122 too_many_args = [0] * (argcount + 1) 123 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 124 self.assertEqual(s.getvalue(), '') 125 126 def test_stdin_none(self): 127 # .stdin is None when not redirected 128 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 129 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 130 self.addCleanup(p.stdout.close) 131 self.addCleanup(p.stderr.close) 132 p.wait() 133 self.assertEqual(p.stdin, None) 134 135 def test_stdout_none(self): 136 # .stdout is None when not redirected 137 p = subprocess.Popen([sys.executable, "-c", 138 'print " this bit of output is from a ' 139 'test of stdout in a different ' 140 'process ..."'], 141 stdin=subprocess.PIPE, stderr=subprocess.PIPE) 142 self.addCleanup(p.stdin.close) 143 self.addCleanup(p.stderr.close) 144 p.wait() 145 self.assertEqual(p.stdout, None) 146 147 def test_stderr_none(self): 148 # .stderr is None when not redirected 149 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 150 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 151 self.addCleanup(p.stdout.close) 152 self.addCleanup(p.stdin.close) 153 p.wait() 154 self.assertEqual(p.stderr, None) 155 156 def test_executable_with_cwd(self): 157 python_dir = os.path.dirname(os.path.realpath(sys.executable)) 158 p = subprocess.Popen(["somethingyoudonthave", "-c", 159 "import sys; sys.exit(47)"], 160 executable=sys.executable, cwd=python_dir) 161 p.wait() 162 self.assertEqual(p.returncode, 47) 163 164 @unittest.skipIf(sysconfig.is_python_build(), 165 "need an installed Python. See #7774") 166 def test_executable_without_cwd(self): 167 # For a normal installation, it should work without 'cwd' 168 # argument. For test runs in the build directory, see #7774. 169 p = subprocess.Popen(["somethingyoudonthave", "-c", 170 "import sys; sys.exit(47)"], 171 executable=sys.executable) 172 p.wait() 173 self.assertEqual(p.returncode, 47) 174 175 def test_stdin_pipe(self): 176 # stdin redirection 177 p = subprocess.Popen([sys.executable, "-c", 178 'import sys; sys.exit(sys.stdin.read() == "pear")'], 179 stdin=subprocess.PIPE) 180 p.stdin.write("pear") 181 p.stdin.close() 182 p.wait() 183 self.assertEqual(p.returncode, 1) 184 185 def test_stdin_filedes(self): 186 # stdin is set to open file descriptor 187 tf = tempfile.TemporaryFile() 188 d = tf.fileno() 189 os.write(d, "pear") 190 os.lseek(d, 0, 0) 191 p = subprocess.Popen([sys.executable, "-c", 192 'import sys; sys.exit(sys.stdin.read() == "pear")'], 193 stdin=d) 194 p.wait() 195 self.assertEqual(p.returncode, 1) 196 197 def test_stdin_fileobj(self): 198 # stdin is set to open file object 199 tf = tempfile.TemporaryFile() 200 tf.write("pear") 201 tf.seek(0) 202 p = subprocess.Popen([sys.executable, "-c", 203 'import sys; sys.exit(sys.stdin.read() == "pear")'], 204 stdin=tf) 205 p.wait() 206 self.assertEqual(p.returncode, 1) 207 208 def test_stdout_pipe(self): 209 # stdout redirection 210 p = subprocess.Popen([sys.executable, "-c", 211 'import sys; sys.stdout.write("orange")'], 212 stdout=subprocess.PIPE) 213 self.addCleanup(p.stdout.close) 214 self.assertEqual(p.stdout.read(), "orange") 215 216 def test_stdout_filedes(self): 217 # stdout is set to open file descriptor 218 tf = tempfile.TemporaryFile() 219 d = tf.fileno() 220 p = subprocess.Popen([sys.executable, "-c", 221 'import sys; sys.stdout.write("orange")'], 222 stdout=d) 223 p.wait() 224 os.lseek(d, 0, 0) 225 self.assertEqual(os.read(d, 1024), "orange") 226 227 def test_stdout_fileobj(self): 228 # stdout is set to open file object 229 tf = tempfile.TemporaryFile() 230 p = subprocess.Popen([sys.executable, "-c", 231 'import sys; sys.stdout.write("orange")'], 232 stdout=tf) 233 p.wait() 234 tf.seek(0) 235 self.assertEqual(tf.read(), "orange") 236 237 def test_stderr_pipe(self): 238 # stderr redirection 239 p = subprocess.Popen([sys.executable, "-c", 240 'import sys; sys.stderr.write("strawberry")'], 241 stderr=subprocess.PIPE) 242 self.addCleanup(p.stderr.close) 243 self.assertStderrEqual(p.stderr.read(), "strawberry") 244 245 def test_stderr_filedes(self): 246 # stderr is set to open file descriptor 247 tf = tempfile.TemporaryFile() 248 d = tf.fileno() 249 p = subprocess.Popen([sys.executable, "-c", 250 'import sys; sys.stderr.write("strawberry")'], 251 stderr=d) 252 p.wait() 253 os.lseek(d, 0, 0) 254 self.assertStderrEqual(os.read(d, 1024), "strawberry") 255 256 def test_stderr_fileobj(self): 257 # stderr is set to open file object 258 tf = tempfile.TemporaryFile() 259 p = subprocess.Popen([sys.executable, "-c", 260 'import sys; sys.stderr.write("strawberry")'], 261 stderr=tf) 262 p.wait() 263 tf.seek(0) 264 self.assertStderrEqual(tf.read(), "strawberry") 265 266 def test_stdout_stderr_pipe(self): 267 # capture stdout and stderr to the same pipe 268 p = subprocess.Popen([sys.executable, "-c", 269 'import sys;' 270 'sys.stdout.write("apple");' 271 'sys.stdout.flush();' 272 'sys.stderr.write("orange")'], 273 stdout=subprocess.PIPE, 274 stderr=subprocess.STDOUT) 275 self.addCleanup(p.stdout.close) 276 self.assertStderrEqual(p.stdout.read(), "appleorange") 277 278 def test_stdout_stderr_file(self): 279 # capture stdout and stderr to the same open file 280 tf = tempfile.TemporaryFile() 281 p = subprocess.Popen([sys.executable, "-c", 282 'import sys;' 283 'sys.stdout.write("apple");' 284 'sys.stdout.flush();' 285 'sys.stderr.write("orange")'], 286 stdout=tf, 287 stderr=tf) 288 p.wait() 289 tf.seek(0) 290 self.assertStderrEqual(tf.read(), "appleorange") 291 292 def test_stdout_filedes_of_stdout(self): 293 # stdout is set to 1 (#1531862). 294 cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))" 295 rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) 296 self.assertEqual(rc, 2) 297 298 def test_cwd(self): 299 tmpdir = tempfile.gettempdir() 300 # We cannot use os.path.realpath to canonicalize the path, 301 # since it doesn't expand Tru64 {memb} strings. See bug 1063571. 302 cwd = os.getcwd() 303 os.chdir(tmpdir) 304 tmpdir = os.getcwd() 305 os.chdir(cwd) 306 p = subprocess.Popen([sys.executable, "-c", 307 'import sys,os;' 308 'sys.stdout.write(os.getcwd())'], 309 stdout=subprocess.PIPE, 310 cwd=tmpdir) 311 self.addCleanup(p.stdout.close) 312 normcase = os.path.normcase 313 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) 314 315 def test_env(self): 316 newenv = os.environ.copy() 317 newenv["FRUIT"] = "orange" 318 p = subprocess.Popen([sys.executable, "-c", 319 'import sys,os;' 320 'sys.stdout.write(os.getenv("FRUIT"))'], 321 stdout=subprocess.PIPE, 322 env=newenv) 323 self.addCleanup(p.stdout.close) 324 self.assertEqual(p.stdout.read(), "orange") 325 326 def test_communicate_stdin(self): 327 p = subprocess.Popen([sys.executable, "-c", 328 'import sys;' 329 'sys.exit(sys.stdin.read() == "pear")'], 330 stdin=subprocess.PIPE) 331 p.communicate("pear") 332 self.assertEqual(p.returncode, 1) 333 334 def test_communicate_stdout(self): 335 p = subprocess.Popen([sys.executable, "-c", 336 'import sys; sys.stdout.write("pineapple")'], 337 stdout=subprocess.PIPE) 338 (stdout, stderr) = p.communicate() 339 self.assertEqual(stdout, "pineapple") 340 self.assertEqual(stderr, None) 341 342 def test_communicate_stderr(self): 343 p = subprocess.Popen([sys.executable, "-c", 344 'import sys; sys.stderr.write("pineapple")'], 345 stderr=subprocess.PIPE) 346 (stdout, stderr) = p.communicate() 347 self.assertEqual(stdout, None) 348 self.assertStderrEqual(stderr, "pineapple") 349 350 def test_communicate(self): 351 p = subprocess.Popen([sys.executable, "-c", 352 'import sys,os;' 353 'sys.stderr.write("pineapple");' 354 'sys.stdout.write(sys.stdin.read())'], 355 stdin=subprocess.PIPE, 356 stdout=subprocess.PIPE, 357 stderr=subprocess.PIPE) 358 self.addCleanup(p.stdout.close) 359 self.addCleanup(p.stderr.close) 360 self.addCleanup(p.stdin.close) 361 (stdout, stderr) = p.communicate("banana") 362 self.assertEqual(stdout, "banana") 363 self.assertStderrEqual(stderr, "pineapple") 364 365 # This test is Linux specific for simplicity to at least have 366 # some coverage. It is not a platform specific bug. 367 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 368 "Linux specific") 369 # Test for the fd leak reported in http://bugs.python.org/issue2791. 370 def test_communicate_pipe_fd_leak(self): 371 fd_directory = '/proc/%d/fd' % os.getpid() 372 num_fds_before_popen = len(os.listdir(fd_directory)) 373 p = subprocess.Popen([sys.executable, "-c", "print()"], 374 stdout=subprocess.PIPE) 375 p.communicate() 376 num_fds_after_communicate = len(os.listdir(fd_directory)) 377 del p 378 num_fds_after_destruction = len(os.listdir(fd_directory)) 379 self.assertEqual(num_fds_before_popen, num_fds_after_destruction) 380 self.assertEqual(num_fds_before_popen, num_fds_after_communicate) 381 382 def test_communicate_returns(self): 383 # communicate() should return None if no redirection is active 384 p = subprocess.Popen([sys.executable, "-c", 385 "import sys; sys.exit(47)"]) 386 (stdout, stderr) = p.communicate() 387 self.assertEqual(stdout, None) 388 self.assertEqual(stderr, None) 389 390 def test_communicate_pipe_buf(self): 391 # communicate() with writes larger than pipe_buf 392 # This test will probably deadlock rather than fail, if 393 # communicate() does not work properly. 394 x, y = os.pipe() 395 if mswindows: 396 pipe_buf = 512 397 else: 398 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") 399 os.close(x) 400 os.close(y) 401 p = subprocess.Popen([sys.executable, "-c", 402 'import sys,os;' 403 'sys.stdout.write(sys.stdin.read(47));' 404 'sys.stderr.write("xyz"*%d);' 405 'sys.stdout.write(sys.stdin.read())' % pipe_buf], 406 stdin=subprocess.PIPE, 407 stdout=subprocess.PIPE, 408 stderr=subprocess.PIPE) 409 self.addCleanup(p.stdout.close) 410 self.addCleanup(p.stderr.close) 411 self.addCleanup(p.stdin.close) 412 string_to_write = "abc"*pipe_buf 413 (stdout, stderr) = p.communicate(string_to_write) 414 self.assertEqual(stdout, string_to_write) 415 416 def test_writes_before_communicate(self): 417 # stdin.write before communicate() 418 p = subprocess.Popen([sys.executable, "-c", 419 'import sys,os;' 420 'sys.stdout.write(sys.stdin.read())'], 421 stdin=subprocess.PIPE, 422 stdout=subprocess.PIPE, 423 stderr=subprocess.PIPE) 424 self.addCleanup(p.stdout.close) 425 self.addCleanup(p.stderr.close) 426 self.addCleanup(p.stdin.close) 427 p.stdin.write("banana") 428 (stdout, stderr) = p.communicate("split") 429 self.assertEqual(stdout, "bananasplit") 430 self.assertStderrEqual(stderr, "") 431 432 def test_universal_newlines(self): 433 p = subprocess.Popen([sys.executable, "-c", 434 'import sys,os;' + SETBINARY + 435 'sys.stdout.write("line1\\n");' 436 'sys.stdout.flush();' 437 'sys.stdout.write("line2\\r");' 438 'sys.stdout.flush();' 439 'sys.stdout.write("line3\\r\\n");' 440 'sys.stdout.flush();' 441 'sys.stdout.write("line4\\r");' 442 'sys.stdout.flush();' 443 'sys.stdout.write("\\nline5");' 444 'sys.stdout.flush();' 445 'sys.stdout.write("\\nline6");'], 446 stdout=subprocess.PIPE, 447 universal_newlines=1) 448 self.addCleanup(p.stdout.close) 449 stdout = p.stdout.read() 450 if hasattr(file, 'newlines'): 451 # Interpreter with universal newline support 452 self.assertEqual(stdout, 453 "line1\nline2\nline3\nline4\nline5\nline6") 454 else: 455 # Interpreter without universal newline support 456 self.assertEqual(stdout, 457 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 458 459 def test_universal_newlines_communicate(self): 460 # universal newlines through communicate() 461 p = subprocess.Popen([sys.executable, "-c", 462 'import sys,os;' + SETBINARY + 463 'sys.stdout.write("line1\\n");' 464 'sys.stdout.flush();' 465 'sys.stdout.write("line2\\r");' 466 'sys.stdout.flush();' 467 'sys.stdout.write("line3\\r\\n");' 468 'sys.stdout.flush();' 469 'sys.stdout.write("line4\\r");' 470 'sys.stdout.flush();' 471 'sys.stdout.write("\\nline5");' 472 'sys.stdout.flush();' 473 'sys.stdout.write("\\nline6");'], 474 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 475 universal_newlines=1) 476 self.addCleanup(p.stdout.close) 477 self.addCleanup(p.stderr.close) 478 (stdout, stderr) = p.communicate() 479 if hasattr(file, 'newlines'): 480 # Interpreter with universal newline support 481 self.assertEqual(stdout, 482 "line1\nline2\nline3\nline4\nline5\nline6") 483 else: 484 # Interpreter without universal newline support 485 self.assertEqual(stdout, 486 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 487 488 def test_no_leaking(self): 489 # Make sure we leak no resources 490 if not mswindows: 491 max_handles = 1026 # too much for most UNIX systems 492 else: 493 max_handles = 2050 # too much for (at least some) Windows setups 494 handles = [] 495 try: 496 for i in range(max_handles): 497 try: 498 handles.append(os.open(test_support.TESTFN, 499 os.O_WRONLY | os.O_CREAT)) 500 except OSError as e: 501 if e.errno != errno.EMFILE: 502 raise 503 break 504 else: 505 self.skipTest("failed to reach the file descriptor limit " 506 "(tried %d)" % max_handles) 507 # Close a couple of them (should be enough for a subprocess) 508 for i in range(10): 509 os.close(handles.pop()) 510 # Loop creating some subprocesses. If one of them leaks some fds, 511 # the next loop iteration will fail by reaching the max fd limit. 512 for i in range(15): 513 p = subprocess.Popen([sys.executable, "-c", 514 "import sys;" 515 "sys.stdout.write(sys.stdin.read())"], 516 stdin=subprocess.PIPE, 517 stdout=subprocess.PIPE, 518 stderr=subprocess.PIPE) 519 data = p.communicate(b"lime")[0] 520 self.assertEqual(data, b"lime") 521 finally: 522 for h in handles: 523 os.close(h) 524 525 def test_list2cmdline(self): 526 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 527 '"a b c" d e') 528 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 529 'ab\\"c \\ d') 530 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 531 'ab\\"c " \\\\" d') 532 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 533 'a\\\\\\b "de fg" h') 534 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 535 'a\\\\\\"b c d') 536 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 537 '"a\\\\b c" d e') 538 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 539 '"a\\\\b\\ c" d e') 540 self.assertEqual(subprocess.list2cmdline(['ab', '']), 541 'ab ""') 542 543 544 def test_poll(self): 545 p = subprocess.Popen([sys.executable, 546 "-c", "import time; time.sleep(1)"]) 547 count = 0 548 while p.poll() is None: 549 time.sleep(0.1) 550 count += 1 551 # We expect that the poll loop probably went around about 10 times, 552 # but, based on system scheduling we can't control, it's possible 553 # poll() never returned None. It "should be" very rare that it 554 # didn't go around at least twice. 555 self.assertGreaterEqual(count, 2) 556 # Subsequent invocations should just return the returncode 557 self.assertEqual(p.poll(), 0) 558 559 560 def test_wait(self): 561 p = subprocess.Popen([sys.executable, 562 "-c", "import time; time.sleep(2)"]) 563 self.assertEqual(p.wait(), 0) 564 # Subsequent invocations should just return the returncode 565 self.assertEqual(p.wait(), 0) 566 567 568 def test_invalid_bufsize(self): 569 # an invalid type of the bufsize argument should raise 570 # TypeError. 571 with self.assertRaises(TypeError): 572 subprocess.Popen([sys.executable, "-c", "pass"], "orange") 573 574 def test_leaking_fds_on_error(self): 575 # see bug #5179: Popen leaks file descriptors to PIPEs if 576 # the child fails to execute; this will eventually exhaust 577 # the maximum number of open fds. 1024 seems a very common 578 # value for that limit, but Windows has 2048, so we loop 579 # 1024 times (each call leaked two fds). 580 for i in range(1024): 581 # Windows raises IOError. Others raise OSError. 582 with self.assertRaises(EnvironmentError) as c: 583 subprocess.Popen(['nonexisting_i_hope'], 584 stdout=subprocess.PIPE, 585 stderr=subprocess.PIPE) 586 # ignore errors that indicate the command was not found 587 if c.exception.errno not in (errno.ENOENT, errno.EACCES): 588 raise c.exception 589 590 def test_handles_closed_on_exception(self): 591 # If CreateProcess exits with an error, ensure the 592 # duplicate output handles are released 593 ifhandle, ifname = mkstemp() 594 ofhandle, ofname = mkstemp() 595 efhandle, efname = mkstemp() 596 try: 597 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 598 stderr=efhandle) 599 except OSError: 600 os.close(ifhandle) 601 os.remove(ifname) 602 os.close(ofhandle) 603 os.remove(ofname) 604 os.close(efhandle) 605 os.remove(efname) 606 self.assertFalse(os.path.exists(ifname)) 607 self.assertFalse(os.path.exists(ofname)) 608 self.assertFalse(os.path.exists(efname)) 609 610 def test_communicate_epipe(self): 611 # Issue 10963: communicate() should hide EPIPE 612 p = subprocess.Popen([sys.executable, "-c", 'pass'], 613 stdin=subprocess.PIPE, 614 stdout=subprocess.PIPE, 615 stderr=subprocess.PIPE) 616 self.addCleanup(p.stdout.close) 617 self.addCleanup(p.stderr.close) 618 self.addCleanup(p.stdin.close) 619 p.communicate("x" * 2**20) 620 621 def test_communicate_epipe_only_stdin(self): 622 # Issue 10963: communicate() should hide EPIPE 623 p = subprocess.Popen([sys.executable, "-c", 'pass'], 624 stdin=subprocess.PIPE) 625 self.addCleanup(p.stdin.close) 626 time.sleep(2) 627 p.communicate("x" * 2**20) 628 629# context manager 630class _SuppressCoreFiles(object): 631 """Try to prevent core files from being created.""" 632 old_limit = None 633 634 def __enter__(self): 635 """Try to save previous ulimit, then set it to (0, 0).""" 636 try: 637 import resource 638 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) 639 resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) 640 except (ImportError, ValueError, resource.error): 641 pass 642 643 if sys.platform == 'darwin': 644 # Check if the 'Crash Reporter' on OSX was configured 645 # in 'Developer' mode and warn that it will get triggered 646 # when it is. 647 # 648 # This assumes that this context manager is used in tests 649 # that might trigger the next manager. 650 value = subprocess.Popen(['/usr/bin/defaults', 'read', 651 'com.apple.CrashReporter', 'DialogType'], 652 stdout=subprocess.PIPE).communicate()[0] 653 if value.strip() == b'developer': 654 print "this tests triggers the Crash Reporter, that is intentional" 655 sys.stdout.flush() 656 657 def __exit__(self, *args): 658 """Return core file behavior to default.""" 659 if self.old_limit is None: 660 return 661 try: 662 import resource 663 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) 664 except (ImportError, ValueError, resource.error): 665 pass 666 667 @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 668 "Requires signal.SIGALRM") 669 def test_communicate_eintr(self): 670 # Issue #12493: communicate() should handle EINTR 671 def handler(signum, frame): 672 pass 673 old_handler = signal.signal(signal.SIGALRM, handler) 674 self.addCleanup(signal.signal, signal.SIGALRM, old_handler) 675 676 # the process is running for 2 seconds 677 args = [sys.executable, "-c", 'import time; time.sleep(2)'] 678 for stream in ('stdout', 'stderr'): 679 kw = {stream: subprocess.PIPE} 680 with subprocess.Popen(args, **kw) as process: 681 signal.alarm(1) 682 # communicate() will be interrupted by SIGALRM 683 process.communicate() 684 685 686@unittest.skipIf(mswindows, "POSIX specific tests") 687class POSIXProcessTestCase(BaseTestCase): 688 689 def test_exceptions(self): 690 # caught & re-raised exceptions 691 with self.assertRaises(OSError) as c: 692 p = subprocess.Popen([sys.executable, "-c", ""], 693 cwd="/this/path/does/not/exist") 694 # The attribute child_traceback should contain "os.chdir" somewhere. 695 self.assertIn("os.chdir", c.exception.child_traceback) 696 697 def test_run_abort(self): 698 # returncode handles signal termination 699 with _SuppressCoreFiles(): 700 p = subprocess.Popen([sys.executable, "-c", 701 "import os; os.abort()"]) 702 p.wait() 703 self.assertEqual(-p.returncode, signal.SIGABRT) 704 705 def test_preexec(self): 706 # preexec function 707 p = subprocess.Popen([sys.executable, "-c", 708 "import sys, os;" 709 "sys.stdout.write(os.getenv('FRUIT'))"], 710 stdout=subprocess.PIPE, 711 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 712 self.addCleanup(p.stdout.close) 713 self.assertEqual(p.stdout.read(), "apple") 714 715 def test_args_string(self): 716 # args is a string 717 f, fname = mkstemp() 718 os.write(f, "#!/bin/sh\n") 719 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 720 sys.executable) 721 os.close(f) 722 os.chmod(fname, 0o700) 723 p = subprocess.Popen(fname) 724 p.wait() 725 os.remove(fname) 726 self.assertEqual(p.returncode, 47) 727 728 def test_invalid_args(self): 729 # invalid arguments should raise ValueError 730 self.assertRaises(ValueError, subprocess.call, 731 [sys.executable, "-c", 732 "import sys; sys.exit(47)"], 733 startupinfo=47) 734 self.assertRaises(ValueError, subprocess.call, 735 [sys.executable, "-c", 736 "import sys; sys.exit(47)"], 737 creationflags=47) 738 739 def test_shell_sequence(self): 740 # Run command through the shell (sequence) 741 newenv = os.environ.copy() 742 newenv["FRUIT"] = "apple" 743 p = subprocess.Popen(["echo $FRUIT"], shell=1, 744 stdout=subprocess.PIPE, 745 env=newenv) 746 self.addCleanup(p.stdout.close) 747 self.assertEqual(p.stdout.read().strip(), "apple") 748 749 def test_shell_string(self): 750 # Run command through the shell (string) 751 newenv = os.environ.copy() 752 newenv["FRUIT"] = "apple" 753 p = subprocess.Popen("echo $FRUIT", shell=1, 754 stdout=subprocess.PIPE, 755 env=newenv) 756 self.addCleanup(p.stdout.close) 757 self.assertEqual(p.stdout.read().strip(), "apple") 758 759 def test_call_string(self): 760 # call() function with string argument on UNIX 761 f, fname = mkstemp() 762 os.write(f, "#!/bin/sh\n") 763 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 764 sys.executable) 765 os.close(f) 766 os.chmod(fname, 0700) 767 rc = subprocess.call(fname) 768 os.remove(fname) 769 self.assertEqual(rc, 47) 770 771 def test_specific_shell(self): 772 # Issue #9265: Incorrect name passed as arg[0]. 773 shells = [] 774 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 775 for name in ['bash', 'ksh']: 776 sh = os.path.join(prefix, name) 777 if os.path.isfile(sh): 778 shells.append(sh) 779 if not shells: # Will probably work for any shell but csh. 780 self.skipTest("bash or ksh required for this test") 781 sh = '/bin/sh' 782 if os.path.isfile(sh) and not os.path.islink(sh): 783 # Test will fail if /bin/sh is a symlink to csh. 784 shells.append(sh) 785 for sh in shells: 786 p = subprocess.Popen("echo $0", executable=sh, shell=True, 787 stdout=subprocess.PIPE) 788 self.addCleanup(p.stdout.close) 789 self.assertEqual(p.stdout.read().strip(), sh) 790 791 def _kill_process(self, method, *args): 792 # Do not inherit file handles from the parent. 793 # It should fix failures on some platforms. 794 p = subprocess.Popen([sys.executable, "-c", """if 1: 795 import sys, time 796 sys.stdout.write('x\\n') 797 sys.stdout.flush() 798 time.sleep(30) 799 """], 800 close_fds=True, 801 stdin=subprocess.PIPE, 802 stdout=subprocess.PIPE, 803 stderr=subprocess.PIPE) 804 # Wait for the interpreter to be completely initialized before 805 # sending any signal. 806 p.stdout.read(1) 807 getattr(p, method)(*args) 808 return p 809 810 def test_send_signal(self): 811 p = self._kill_process('send_signal', signal.SIGINT) 812 _, stderr = p.communicate() 813 self.assertIn('KeyboardInterrupt', stderr) 814 self.assertNotEqual(p.wait(), 0) 815 816 def test_kill(self): 817 p = self._kill_process('kill') 818 _, stderr = p.communicate() 819 self.assertStderrEqual(stderr, '') 820 self.assertEqual(p.wait(), -signal.SIGKILL) 821 822 def test_terminate(self): 823 p = self._kill_process('terminate') 824 _, stderr = p.communicate() 825 self.assertStderrEqual(stderr, '') 826 self.assertEqual(p.wait(), -signal.SIGTERM) 827 828 def check_close_std_fds(self, fds): 829 # Issue #9905: test that subprocess pipes still work properly with 830 # some standard fds closed 831 stdin = 0 832 newfds = [] 833 for a in fds: 834 b = os.dup(a) 835 newfds.append(b) 836 if a == 0: 837 stdin = b 838 try: 839 for fd in fds: 840 os.close(fd) 841 out, err = subprocess.Popen([sys.executable, "-c", 842 'import sys;' 843 'sys.stdout.write("apple");' 844 'sys.stdout.flush();' 845 'sys.stderr.write("orange")'], 846 stdin=stdin, 847 stdout=subprocess.PIPE, 848 stderr=subprocess.PIPE).communicate() 849 err = test_support.strip_python_stderr(err) 850 self.assertEqual((out, err), (b'apple', b'orange')) 851 finally: 852 for b, a in zip(newfds, fds): 853 os.dup2(b, a) 854 for b in newfds: 855 os.close(b) 856 857 def test_close_fd_0(self): 858 self.check_close_std_fds([0]) 859 860 def test_close_fd_1(self): 861 self.check_close_std_fds([1]) 862 863 def test_close_fd_2(self): 864 self.check_close_std_fds([2]) 865 866 def test_close_fds_0_1(self): 867 self.check_close_std_fds([0, 1]) 868 869 def test_close_fds_0_2(self): 870 self.check_close_std_fds([0, 2]) 871 872 def test_close_fds_1_2(self): 873 self.check_close_std_fds([1, 2]) 874 875 def test_close_fds_0_1_2(self): 876 # Issue #10806: test that subprocess pipes still work properly with 877 # all standard fds closed. 878 self.check_close_std_fds([0, 1, 2]) 879 880 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 881 # open up some temporary files 882 temps = [mkstemp() for i in range(3)] 883 temp_fds = [fd for fd, fname in temps] 884 try: 885 # unlink the files -- we won't need to reopen them 886 for fd, fname in temps: 887 os.unlink(fname) 888 889 # save a copy of the standard file descriptors 890 saved_fds = [os.dup(fd) for fd in range(3)] 891 try: 892 # duplicate the temp files over the standard fd's 0, 1, 2 893 for fd, temp_fd in enumerate(temp_fds): 894 os.dup2(temp_fd, fd) 895 896 # write some data to what will become stdin, and rewind 897 os.write(stdin_no, b"STDIN") 898 os.lseek(stdin_no, 0, 0) 899 900 # now use those files in the given order, so that subprocess 901 # has to rearrange them in the child 902 p = subprocess.Popen([sys.executable, "-c", 903 'import sys; got = sys.stdin.read();' 904 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 905 stdin=stdin_no, 906 stdout=stdout_no, 907 stderr=stderr_no) 908 p.wait() 909 910 for fd in temp_fds: 911 os.lseek(fd, 0, 0) 912 913 out = os.read(stdout_no, 1024) 914 err = test_support.strip_python_stderr(os.read(stderr_no, 1024)) 915 finally: 916 for std, saved in enumerate(saved_fds): 917 os.dup2(saved, std) 918 os.close(saved) 919 920 self.assertEqual(out, b"got STDIN") 921 self.assertEqual(err, b"err") 922 923 finally: 924 for fd in temp_fds: 925 os.close(fd) 926 927 # When duping fds, if there arises a situation where one of the fds is 928 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 929 # This tests all combinations of this. 930 def test_swap_fds(self): 931 self.check_swap_fds(0, 1, 2) 932 self.check_swap_fds(0, 2, 1) 933 self.check_swap_fds(1, 0, 2) 934 self.check_swap_fds(1, 2, 0) 935 self.check_swap_fds(2, 0, 1) 936 self.check_swap_fds(2, 1, 0) 937 938 def test_wait_when_sigchild_ignored(self): 939 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 940 sigchild_ignore = test_support.findfile("sigchild_ignore.py", 941 subdir="subprocessdata") 942 p = subprocess.Popen([sys.executable, sigchild_ignore], 943 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 944 stdout, stderr = p.communicate() 945 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 946 " non-zero with this error:\n%s" % stderr) 947 948 def test_zombie_fast_process_del(self): 949 # Issue #12650: on Unix, if Popen.__del__() was called before the 950 # process exited, it wouldn't be added to subprocess._active, and would 951 # remain a zombie. 952 # spawn a Popen, and delete its reference before it exits 953 p = subprocess.Popen([sys.executable, "-c", 954 'import sys, time;' 955 'time.sleep(0.2)'], 956 stdout=subprocess.PIPE, 957 stderr=subprocess.PIPE) 958 self.addCleanup(p.stdout.close) 959 self.addCleanup(p.stderr.close) 960 ident = id(p) 961 pid = p.pid 962 del p 963 # check that p is in the active processes list 964 self.assertIn(ident, [id(o) for o in subprocess._active]) 965 966 def test_leak_fast_process_del_killed(self): 967 # Issue #12650: on Unix, if Popen.__del__() was called before the 968 # process exited, and the process got killed by a signal, it would never 969 # be removed from subprocess._active, which triggered a FD and memory 970 # leak. 971 # spawn a Popen, delete its reference and kill it 972 p = subprocess.Popen([sys.executable, "-c", 973 'import time;' 974 'time.sleep(3)'], 975 stdout=subprocess.PIPE, 976 stderr=subprocess.PIPE) 977 self.addCleanup(p.stdout.close) 978 self.addCleanup(p.stderr.close) 979 ident = id(p) 980 pid = p.pid 981 del p 982 os.kill(pid, signal.SIGKILL) 983 # check that p is in the active processes list 984 self.assertIn(ident, [id(o) for o in subprocess._active]) 985 986 # let some time for the process to exit, and create a new Popen: this 987 # should trigger the wait() of p 988 time.sleep(0.2) 989 with self.assertRaises(EnvironmentError) as c: 990 with subprocess.Popen(['nonexisting_i_hope'], 991 stdout=subprocess.PIPE, 992 stderr=subprocess.PIPE) as proc: 993 pass 994 # p should have been wait()ed on, and removed from the _active list 995 self.assertRaises(OSError, os.waitpid, pid, 0) 996 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 997 998 999@unittest.skipUnless(mswindows, "Windows specific tests") 1000class Win32ProcessTestCase(BaseTestCase): 1001 1002 def test_startupinfo(self): 1003 # startupinfo argument 1004 # We uses hardcoded constants, because we do not want to 1005 # depend on win32all. 1006 STARTF_USESHOWWINDOW = 1 1007 SW_MAXIMIZE = 3 1008 startupinfo = subprocess.STARTUPINFO() 1009 startupinfo.dwFlags = STARTF_USESHOWWINDOW 1010 startupinfo.wShowWindow = SW_MAXIMIZE 1011 # Since Python is a console process, it won't be affected 1012 # by wShowWindow, but the argument should be silently 1013 # ignored 1014 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], 1015 startupinfo=startupinfo) 1016 1017 def test_creationflags(self): 1018 # creationflags argument 1019 CREATE_NEW_CONSOLE = 16 1020 sys.stderr.write(" a DOS box should flash briefly ...\n") 1021 subprocess.call(sys.executable + 1022 ' -c "import time; time.sleep(0.25)"', 1023 creationflags=CREATE_NEW_CONSOLE) 1024 1025 def test_invalid_args(self): 1026 # invalid arguments should raise ValueError 1027 self.assertRaises(ValueError, subprocess.call, 1028 [sys.executable, "-c", 1029 "import sys; sys.exit(47)"], 1030 preexec_fn=lambda: 1) 1031 self.assertRaises(ValueError, subprocess.call, 1032 [sys.executable, "-c", 1033 "import sys; sys.exit(47)"], 1034 stdout=subprocess.PIPE, 1035 close_fds=True) 1036 1037 def test_close_fds(self): 1038 # close file descriptors 1039 rc = subprocess.call([sys.executable, "-c", 1040 "import sys; sys.exit(47)"], 1041 close_fds=True) 1042 self.assertEqual(rc, 47) 1043 1044 def test_shell_sequence(self): 1045 # Run command through the shell (sequence) 1046 newenv = os.environ.copy() 1047 newenv["FRUIT"] = "physalis" 1048 p = subprocess.Popen(["set"], shell=1, 1049 stdout=subprocess.PIPE, 1050 env=newenv) 1051 self.addCleanup(p.stdout.close) 1052 self.assertIn("physalis", p.stdout.read()) 1053 1054 def test_shell_string(self): 1055 # Run command through the shell (string) 1056 newenv = os.environ.copy() 1057 newenv["FRUIT"] = "physalis" 1058 p = subprocess.Popen("set", shell=1, 1059 stdout=subprocess.PIPE, 1060 env=newenv) 1061 self.addCleanup(p.stdout.close) 1062 self.assertIn("physalis", p.stdout.read()) 1063 1064 def test_call_string(self): 1065 # call() function with string argument on Windows 1066 rc = subprocess.call(sys.executable + 1067 ' -c "import sys; sys.exit(47)"') 1068 self.assertEqual(rc, 47) 1069 1070 def _kill_process(self, method, *args): 1071 # Some win32 buildbot raises EOFError if stdin is inherited 1072 p = subprocess.Popen([sys.executable, "-c", """if 1: 1073 import sys, time 1074 sys.stdout.write('x\\n') 1075 sys.stdout.flush() 1076 time.sleep(30) 1077 """], 1078 stdin=subprocess.PIPE, 1079 stdout=subprocess.PIPE, 1080 stderr=subprocess.PIPE) 1081 self.addCleanup(p.stdout.close) 1082 self.addCleanup(p.stderr.close) 1083 self.addCleanup(p.stdin.close) 1084 # Wait for the interpreter to be completely initialized before 1085 # sending any signal. 1086 p.stdout.read(1) 1087 getattr(p, method)(*args) 1088 _, stderr = p.communicate() 1089 self.assertStderrEqual(stderr, '') 1090 returncode = p.wait() 1091 self.assertNotEqual(returncode, 0) 1092 1093 def test_send_signal(self): 1094 self._kill_process('send_signal', signal.SIGTERM) 1095 1096 def test_kill(self): 1097 self._kill_process('kill') 1098 1099 def test_terminate(self): 1100 self._kill_process('terminate') 1101 1102 1103@unittest.skipUnless(getattr(subprocess, '_has_poll', False), 1104 "poll system call not supported") 1105class ProcessTestCaseNoPoll(ProcessTestCase): 1106 def setUp(self): 1107 subprocess._has_poll = False 1108 ProcessTestCase.setUp(self) 1109 1110 def tearDown(self): 1111 subprocess._has_poll = True 1112 ProcessTestCase.tearDown(self) 1113 1114 1115class HelperFunctionTests(unittest.TestCase): 1116 @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") 1117 def test_eintr_retry_call(self): 1118 record_calls = [] 1119 def fake_os_func(*args): 1120 record_calls.append(args) 1121 if len(record_calls) == 2: 1122 raise OSError(errno.EINTR, "fake interrupted system call") 1123 return tuple(reversed(args)) 1124 1125 self.assertEqual((999, 256), 1126 subprocess._eintr_retry_call(fake_os_func, 256, 999)) 1127 self.assertEqual([(256, 999)], record_calls) 1128 # This time there will be an EINTR so it will loop once. 1129 self.assertEqual((666,), 1130 subprocess._eintr_retry_call(fake_os_func, 666)) 1131 self.assertEqual([(256, 999), (666,), (666,)], record_calls) 1132 1133@unittest.skipUnless(mswindows, "mswindows only") 1134class CommandsWithSpaces (BaseTestCase): 1135 1136 def setUp(self): 1137 super(CommandsWithSpaces, self).setUp() 1138 f, fname = mkstemp(".py", "te st") 1139 self.fname = fname.lower () 1140 os.write(f, b"import sys;" 1141 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 1142 ) 1143 os.close(f) 1144 1145 def tearDown(self): 1146 os.remove(self.fname) 1147 super(CommandsWithSpaces, self).tearDown() 1148 1149 def with_spaces(self, *args, **kwargs): 1150 kwargs['stdout'] = subprocess.PIPE 1151 p = subprocess.Popen(*args, **kwargs) 1152 self.addCleanup(p.stdout.close) 1153 self.assertEqual( 1154 p.stdout.read ().decode("mbcs"), 1155 "2 [%r, 'ab cd']" % self.fname 1156 ) 1157 1158 def test_shell_string_with_spaces(self): 1159 # call() function with string argument with spaces on Windows 1160 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1161 "ab cd"), shell=1) 1162 1163 def test_shell_sequence_with_spaces(self): 1164 # call() function with sequence argument with spaces on Windows 1165 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 1166 1167 def test_noshell_string_with_spaces(self): 1168 # call() function with string argument with spaces on Windows 1169 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1170 "ab cd")) 1171 1172 def test_noshell_sequence_with_spaces(self): 1173 # call() function with sequence argument with spaces on Windows 1174 self.with_spaces([sys.executable, self.fname, "ab cd"]) 1175 1176def test_main(): 1177 unit_tests = (ProcessTestCase, 1178 POSIXProcessTestCase, 1179 Win32ProcessTestCase, 1180 ProcessTestCaseNoPoll, 1181 HelperFunctionTests, 1182 CommandsWithSpaces) 1183 1184 test_support.run_unittest(*unit_tests) 1185 test_support.reap_children() 1186 1187if __name__ == "__main__": 1188 test_main() 1189