test_subprocess.py revision e790131dc6f2e4c125f29c882e64f3c88a4210d1
1import unittest 2from test import test_support 3import subprocess 4import sys 5import signal 6import os 7import errno 8import tempfile 9import time 10import re 11import sysconfig 12 13mswindows = (sys.platform == "win32") 14 15# 16# Depends on the following external programs: Python 17# 18 19if mswindows: 20 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 21 'os.O_BINARY);') 22else: 23 SETBINARY = '' 24 25 26try: 27 mkstemp = tempfile.mkstemp 28except AttributeError: 29 # tempfile.mkstemp is not available 30 def mkstemp(): 31 """Replacement for mkstemp, calling mktemp.""" 32 fname = tempfile.mktemp() 33 return os.open(fname, os.O_RDWR|os.O_CREAT), fname 34 35 36class BaseTestCase(unittest.TestCase): 37 def setUp(self): 38 # Try to minimize the number of children we have so this test 39 # doesn't crash on some buildbots (Alphas in particular). 40 test_support.reap_children() 41 42 def tearDown(self): 43 for inst in subprocess._active: 44 inst.wait() 45 subprocess._cleanup() 46 self.assertFalse(subprocess._active, "subprocess._active not empty") 47 48 def assertStderrEqual(self, stderr, expected, msg=None): 49 # In a debug build, stuff like "[6580 refs]" is printed to stderr at 50 # shutdown time. That frustrates tests trying to check stderr produced 51 # from a spawned Python process. 52 actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr) 53 self.assertEqual(actual, expected, msg) 54 55 56class ProcessTestCase(BaseTestCase): 57 58 def test_call_seq(self): 59 # call() function with sequence argument 60 rc = subprocess.call([sys.executable, "-c", 61 "import sys; sys.exit(47)"]) 62 self.assertEqual(rc, 47) 63 64 def test_check_call_zero(self): 65 # check_call() function with zero return code 66 rc = subprocess.check_call([sys.executable, "-c", 67 "import sys; sys.exit(0)"]) 68 self.assertEqual(rc, 0) 69 70 def test_check_call_nonzero(self): 71 # check_call() function with non-zero return code 72 with self.assertRaises(subprocess.CalledProcessError) as c: 73 subprocess.check_call([sys.executable, "-c", 74 "import sys; sys.exit(47)"]) 75 self.assertEqual(c.exception.returncode, 47) 76 77 def test_check_output(self): 78 # check_output() function with zero return code 79 output = subprocess.check_output( 80 [sys.executable, "-c", "print 'BDFL'"]) 81 self.assertIn('BDFL', output) 82 83 def test_check_output_nonzero(self): 84 # check_call() function with non-zero return code 85 with self.assertRaises(subprocess.CalledProcessError) as c: 86 subprocess.check_output( 87 [sys.executable, "-c", "import sys; sys.exit(5)"]) 88 self.assertEqual(c.exception.returncode, 5) 89 90 def test_check_output_stderr(self): 91 # check_output() function stderr redirected to stdout 92 output = subprocess.check_output( 93 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 94 stderr=subprocess.STDOUT) 95 self.assertIn('BDFL', output) 96 97 def test_check_output_stdout_arg(self): 98 # check_output() function stderr redirected to stdout 99 with self.assertRaises(ValueError) as c: 100 output = subprocess.check_output( 101 [sys.executable, "-c", "print 'will not be run'"], 102 stdout=sys.stdout) 103 self.fail("Expected ValueError when stdout arg supplied.") 104 self.assertIn('stdout', c.exception.args[0]) 105 106 def test_call_kwargs(self): 107 # call() function with keyword args 108 newenv = os.environ.copy() 109 newenv["FRUIT"] = "banana" 110 rc = subprocess.call([sys.executable, "-c", 111 'import sys, os;' 112 'sys.exit(os.getenv("FRUIT")=="banana")'], 113 env=newenv) 114 self.assertEqual(rc, 1) 115 116 def test_invalid_args(self): 117 # Popen() called with invalid arguments should raise TypeError 118 # but Popen.__del__ should not complain (issue #12085) 119 with test_support.captured_stderr() as s: 120 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 121 argcount = subprocess.Popen.__init__.__code__.co_argcount 122 too_many_args = [0] * (argcount + 1) 123 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 124 self.assertEqual(s.getvalue(), '') 125 126 def test_stdin_none(self): 127 # .stdin is None when not redirected 128 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 129 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 130 self.addCleanup(p.stdout.close) 131 self.addCleanup(p.stderr.close) 132 p.wait() 133 self.assertEqual(p.stdin, None) 134 135 def test_stdout_none(self): 136 # .stdout is None when not redirected 137 p = subprocess.Popen([sys.executable, "-c", 138 'print " this bit of output is from a ' 139 'test of stdout in a different ' 140 'process ..."'], 141 stdin=subprocess.PIPE, stderr=subprocess.PIPE) 142 self.addCleanup(p.stdin.close) 143 self.addCleanup(p.stderr.close) 144 p.wait() 145 self.assertEqual(p.stdout, None) 146 147 def test_stderr_none(self): 148 # .stderr is None when not redirected 149 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'], 150 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 151 self.addCleanup(p.stdout.close) 152 self.addCleanup(p.stdin.close) 153 p.wait() 154 self.assertEqual(p.stderr, None) 155 156 def test_executable_with_cwd(self): 157 python_dir = os.path.dirname(os.path.realpath(sys.executable)) 158 p = subprocess.Popen(["somethingyoudonthave", "-c", 159 "import sys; sys.exit(47)"], 160 executable=sys.executable, cwd=python_dir) 161 p.wait() 162 self.assertEqual(p.returncode, 47) 163 164 @unittest.skipIf(sysconfig.is_python_build(), 165 "need an installed Python. See #7774") 166 def test_executable_without_cwd(self): 167 # For a normal installation, it should work without 'cwd' 168 # argument. For test runs in the build directory, see #7774. 169 p = subprocess.Popen(["somethingyoudonthave", "-c", 170 "import sys; sys.exit(47)"], 171 executable=sys.executable) 172 p.wait() 173 self.assertEqual(p.returncode, 47) 174 175 def test_stdin_pipe(self): 176 # stdin redirection 177 p = subprocess.Popen([sys.executable, "-c", 178 'import sys; sys.exit(sys.stdin.read() == "pear")'], 179 stdin=subprocess.PIPE) 180 p.stdin.write("pear") 181 p.stdin.close() 182 p.wait() 183 self.assertEqual(p.returncode, 1) 184 185 def test_stdin_filedes(self): 186 # stdin is set to open file descriptor 187 tf = tempfile.TemporaryFile() 188 d = tf.fileno() 189 os.write(d, "pear") 190 os.lseek(d, 0, 0) 191 p = subprocess.Popen([sys.executable, "-c", 192 'import sys; sys.exit(sys.stdin.read() == "pear")'], 193 stdin=d) 194 p.wait() 195 self.assertEqual(p.returncode, 1) 196 197 def test_stdin_fileobj(self): 198 # stdin is set to open file object 199 tf = tempfile.TemporaryFile() 200 tf.write("pear") 201 tf.seek(0) 202 p = subprocess.Popen([sys.executable, "-c", 203 'import sys; sys.exit(sys.stdin.read() == "pear")'], 204 stdin=tf) 205 p.wait() 206 self.assertEqual(p.returncode, 1) 207 208 def test_stdout_pipe(self): 209 # stdout redirection 210 p = subprocess.Popen([sys.executable, "-c", 211 'import sys; sys.stdout.write("orange")'], 212 stdout=subprocess.PIPE) 213 self.addCleanup(p.stdout.close) 214 self.assertEqual(p.stdout.read(), "orange") 215 216 def test_stdout_filedes(self): 217 # stdout is set to open file descriptor 218 tf = tempfile.TemporaryFile() 219 d = tf.fileno() 220 p = subprocess.Popen([sys.executable, "-c", 221 'import sys; sys.stdout.write("orange")'], 222 stdout=d) 223 p.wait() 224 os.lseek(d, 0, 0) 225 self.assertEqual(os.read(d, 1024), "orange") 226 227 def test_stdout_fileobj(self): 228 # stdout is set to open file object 229 tf = tempfile.TemporaryFile() 230 p = subprocess.Popen([sys.executable, "-c", 231 'import sys; sys.stdout.write("orange")'], 232 stdout=tf) 233 p.wait() 234 tf.seek(0) 235 self.assertEqual(tf.read(), "orange") 236 237 def test_stderr_pipe(self): 238 # stderr redirection 239 p = subprocess.Popen([sys.executable, "-c", 240 'import sys; sys.stderr.write("strawberry")'], 241 stderr=subprocess.PIPE) 242 self.addCleanup(p.stderr.close) 243 self.assertStderrEqual(p.stderr.read(), "strawberry") 244 245 def test_stderr_filedes(self): 246 # stderr is set to open file descriptor 247 tf = tempfile.TemporaryFile() 248 d = tf.fileno() 249 p = subprocess.Popen([sys.executable, "-c", 250 'import sys; sys.stderr.write("strawberry")'], 251 stderr=d) 252 p.wait() 253 os.lseek(d, 0, 0) 254 self.assertStderrEqual(os.read(d, 1024), "strawberry") 255 256 def test_stderr_fileobj(self): 257 # stderr is set to open file object 258 tf = tempfile.TemporaryFile() 259 p = subprocess.Popen([sys.executable, "-c", 260 'import sys; sys.stderr.write("strawberry")'], 261 stderr=tf) 262 p.wait() 263 tf.seek(0) 264 self.assertStderrEqual(tf.read(), "strawberry") 265 266 def test_stdout_stderr_pipe(self): 267 # capture stdout and stderr to the same pipe 268 p = subprocess.Popen([sys.executable, "-c", 269 'import sys;' 270 'sys.stdout.write("apple");' 271 'sys.stdout.flush();' 272 'sys.stderr.write("orange")'], 273 stdout=subprocess.PIPE, 274 stderr=subprocess.STDOUT) 275 self.addCleanup(p.stdout.close) 276 self.assertStderrEqual(p.stdout.read(), "appleorange") 277 278 def test_stdout_stderr_file(self): 279 # capture stdout and stderr to the same open file 280 tf = tempfile.TemporaryFile() 281 p = subprocess.Popen([sys.executable, "-c", 282 'import sys;' 283 'sys.stdout.write("apple");' 284 'sys.stdout.flush();' 285 'sys.stderr.write("orange")'], 286 stdout=tf, 287 stderr=tf) 288 p.wait() 289 tf.seek(0) 290 self.assertStderrEqual(tf.read(), "appleorange") 291 292 def test_stdout_filedes_of_stdout(self): 293 # stdout is set to 1 (#1531862). 294 cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))" 295 rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) 296 self.assertEqual(rc, 2) 297 298 def test_cwd(self): 299 tmpdir = tempfile.gettempdir() 300 # We cannot use os.path.realpath to canonicalize the path, 301 # since it doesn't expand Tru64 {memb} strings. See bug 1063571. 302 cwd = os.getcwd() 303 os.chdir(tmpdir) 304 tmpdir = os.getcwd() 305 os.chdir(cwd) 306 p = subprocess.Popen([sys.executable, "-c", 307 'import sys,os;' 308 'sys.stdout.write(os.getcwd())'], 309 stdout=subprocess.PIPE, 310 cwd=tmpdir) 311 self.addCleanup(p.stdout.close) 312 normcase = os.path.normcase 313 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir)) 314 315 def test_env(self): 316 newenv = os.environ.copy() 317 newenv["FRUIT"] = "orange" 318 p = subprocess.Popen([sys.executable, "-c", 319 'import sys,os;' 320 'sys.stdout.write(os.getenv("FRUIT"))'], 321 stdout=subprocess.PIPE, 322 env=newenv) 323 self.addCleanup(p.stdout.close) 324 self.assertEqual(p.stdout.read(), "orange") 325 326 def test_communicate_stdin(self): 327 p = subprocess.Popen([sys.executable, "-c", 328 'import sys;' 329 'sys.exit(sys.stdin.read() == "pear")'], 330 stdin=subprocess.PIPE) 331 p.communicate("pear") 332 self.assertEqual(p.returncode, 1) 333 334 def test_communicate_stdout(self): 335 p = subprocess.Popen([sys.executable, "-c", 336 'import sys; sys.stdout.write("pineapple")'], 337 stdout=subprocess.PIPE) 338 (stdout, stderr) = p.communicate() 339 self.assertEqual(stdout, "pineapple") 340 self.assertEqual(stderr, None) 341 342 def test_communicate_stderr(self): 343 p = subprocess.Popen([sys.executable, "-c", 344 'import sys; sys.stderr.write("pineapple")'], 345 stderr=subprocess.PIPE) 346 (stdout, stderr) = p.communicate() 347 self.assertEqual(stdout, None) 348 self.assertStderrEqual(stderr, "pineapple") 349 350 def test_communicate(self): 351 p = subprocess.Popen([sys.executable, "-c", 352 'import sys,os;' 353 'sys.stderr.write("pineapple");' 354 'sys.stdout.write(sys.stdin.read())'], 355 stdin=subprocess.PIPE, 356 stdout=subprocess.PIPE, 357 stderr=subprocess.PIPE) 358 self.addCleanup(p.stdout.close) 359 self.addCleanup(p.stderr.close) 360 self.addCleanup(p.stdin.close) 361 (stdout, stderr) = p.communicate("banana") 362 self.assertEqual(stdout, "banana") 363 self.assertStderrEqual(stderr, "pineapple") 364 365 # This test is Linux specific for simplicity to at least have 366 # some coverage. It is not a platform specific bug. 367 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 368 "Linux specific") 369 # Test for the fd leak reported in http://bugs.python.org/issue2791. 370 def test_communicate_pipe_fd_leak(self): 371 fd_directory = '/proc/%d/fd' % os.getpid() 372 num_fds_before_popen = len(os.listdir(fd_directory)) 373 p = subprocess.Popen([sys.executable, "-c", "print()"], 374 stdout=subprocess.PIPE) 375 p.communicate() 376 num_fds_after_communicate = len(os.listdir(fd_directory)) 377 del p 378 num_fds_after_destruction = len(os.listdir(fd_directory)) 379 self.assertEqual(num_fds_before_popen, num_fds_after_destruction) 380 self.assertEqual(num_fds_before_popen, num_fds_after_communicate) 381 382 def test_communicate_returns(self): 383 # communicate() should return None if no redirection is active 384 p = subprocess.Popen([sys.executable, "-c", 385 "import sys; sys.exit(47)"]) 386 (stdout, stderr) = p.communicate() 387 self.assertEqual(stdout, None) 388 self.assertEqual(stderr, None) 389 390 def test_communicate_pipe_buf(self): 391 # communicate() with writes larger than pipe_buf 392 # This test will probably deadlock rather than fail, if 393 # communicate() does not work properly. 394 x, y = os.pipe() 395 if mswindows: 396 pipe_buf = 512 397 else: 398 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") 399 os.close(x) 400 os.close(y) 401 p = subprocess.Popen([sys.executable, "-c", 402 'import sys,os;' 403 'sys.stdout.write(sys.stdin.read(47));' 404 'sys.stderr.write("xyz"*%d);' 405 'sys.stdout.write(sys.stdin.read())' % pipe_buf], 406 stdin=subprocess.PIPE, 407 stdout=subprocess.PIPE, 408 stderr=subprocess.PIPE) 409 self.addCleanup(p.stdout.close) 410 self.addCleanup(p.stderr.close) 411 self.addCleanup(p.stdin.close) 412 string_to_write = "abc"*pipe_buf 413 (stdout, stderr) = p.communicate(string_to_write) 414 self.assertEqual(stdout, string_to_write) 415 416 def test_writes_before_communicate(self): 417 # stdin.write before communicate() 418 p = subprocess.Popen([sys.executable, "-c", 419 'import sys,os;' 420 'sys.stdout.write(sys.stdin.read())'], 421 stdin=subprocess.PIPE, 422 stdout=subprocess.PIPE, 423 stderr=subprocess.PIPE) 424 self.addCleanup(p.stdout.close) 425 self.addCleanup(p.stderr.close) 426 self.addCleanup(p.stdin.close) 427 p.stdin.write("banana") 428 (stdout, stderr) = p.communicate("split") 429 self.assertEqual(stdout, "bananasplit") 430 self.assertStderrEqual(stderr, "") 431 432 def test_universal_newlines(self): 433 p = subprocess.Popen([sys.executable, "-c", 434 'import sys,os;' + SETBINARY + 435 'sys.stdout.write("line1\\n");' 436 'sys.stdout.flush();' 437 'sys.stdout.write("line2\\r");' 438 'sys.stdout.flush();' 439 'sys.stdout.write("line3\\r\\n");' 440 'sys.stdout.flush();' 441 'sys.stdout.write("line4\\r");' 442 'sys.stdout.flush();' 443 'sys.stdout.write("\\nline5");' 444 'sys.stdout.flush();' 445 'sys.stdout.write("\\nline6");'], 446 stdout=subprocess.PIPE, 447 universal_newlines=1) 448 self.addCleanup(p.stdout.close) 449 stdout = p.stdout.read() 450 if hasattr(file, 'newlines'): 451 # Interpreter with universal newline support 452 self.assertEqual(stdout, 453 "line1\nline2\nline3\nline4\nline5\nline6") 454 else: 455 # Interpreter without universal newline support 456 self.assertEqual(stdout, 457 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 458 459 def test_universal_newlines_communicate(self): 460 # universal newlines through communicate() 461 p = subprocess.Popen([sys.executable, "-c", 462 'import sys,os;' + SETBINARY + 463 'sys.stdout.write("line1\\n");' 464 'sys.stdout.flush();' 465 'sys.stdout.write("line2\\r");' 466 'sys.stdout.flush();' 467 'sys.stdout.write("line3\\r\\n");' 468 'sys.stdout.flush();' 469 'sys.stdout.write("line4\\r");' 470 'sys.stdout.flush();' 471 'sys.stdout.write("\\nline5");' 472 'sys.stdout.flush();' 473 'sys.stdout.write("\\nline6");'], 474 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 475 universal_newlines=1) 476 self.addCleanup(p.stdout.close) 477 self.addCleanup(p.stderr.close) 478 (stdout, stderr) = p.communicate() 479 if hasattr(file, 'newlines'): 480 # Interpreter with universal newline support 481 self.assertEqual(stdout, 482 "line1\nline2\nline3\nline4\nline5\nline6") 483 else: 484 # Interpreter without universal newline support 485 self.assertEqual(stdout, 486 "line1\nline2\rline3\r\nline4\r\nline5\nline6") 487 488 def test_no_leaking(self): 489 # Make sure we leak no resources 490 if not mswindows: 491 max_handles = 1026 # too much for most UNIX systems 492 else: 493 max_handles = 2050 # too much for (at least some) Windows setups 494 handles = [] 495 try: 496 for i in range(max_handles): 497 try: 498 handles.append(os.open(test_support.TESTFN, 499 os.O_WRONLY | os.O_CREAT)) 500 except OSError as e: 501 if e.errno != errno.EMFILE: 502 raise 503 break 504 else: 505 self.skipTest("failed to reach the file descriptor limit " 506 "(tried %d)" % max_handles) 507 # Close a couple of them (should be enough for a subprocess) 508 for i in range(10): 509 os.close(handles.pop()) 510 # Loop creating some subprocesses. If one of them leaks some fds, 511 # the next loop iteration will fail by reaching the max fd limit. 512 for i in range(15): 513 p = subprocess.Popen([sys.executable, "-c", 514 "import sys;" 515 "sys.stdout.write(sys.stdin.read())"], 516 stdin=subprocess.PIPE, 517 stdout=subprocess.PIPE, 518 stderr=subprocess.PIPE) 519 data = p.communicate(b"lime")[0] 520 self.assertEqual(data, b"lime") 521 finally: 522 for h in handles: 523 os.close(h) 524 525 def test_list2cmdline(self): 526 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 527 '"a b c" d e') 528 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 529 'ab\\"c \\ d') 530 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 531 'ab\\"c " \\\\" d') 532 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 533 'a\\\\\\b "de fg" h') 534 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 535 'a\\\\\\"b c d') 536 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 537 '"a\\\\b c" d e') 538 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 539 '"a\\\\b\\ c" d e') 540 self.assertEqual(subprocess.list2cmdline(['ab', '']), 541 'ab ""') 542 543 544 def test_poll(self): 545 p = subprocess.Popen([sys.executable, 546 "-c", "import time; time.sleep(1)"]) 547 count = 0 548 while p.poll() is None: 549 time.sleep(0.1) 550 count += 1 551 # We expect that the poll loop probably went around about 10 times, 552 # but, based on system scheduling we can't control, it's possible 553 # poll() never returned None. It "should be" very rare that it 554 # didn't go around at least twice. 555 self.assertGreaterEqual(count, 2) 556 # Subsequent invocations should just return the returncode 557 self.assertEqual(p.poll(), 0) 558 559 560 def test_wait(self): 561 p = subprocess.Popen([sys.executable, 562 "-c", "import time; time.sleep(2)"]) 563 self.assertEqual(p.wait(), 0) 564 # Subsequent invocations should just return the returncode 565 self.assertEqual(p.wait(), 0) 566 567 568 def test_invalid_bufsize(self): 569 # an invalid type of the bufsize argument should raise 570 # TypeError. 571 with self.assertRaises(TypeError): 572 subprocess.Popen([sys.executable, "-c", "pass"], "orange") 573 574 def test_leaking_fds_on_error(self): 575 # see bug #5179: Popen leaks file descriptors to PIPEs if 576 # the child fails to execute; this will eventually exhaust 577 # the maximum number of open fds. 1024 seems a very common 578 # value for that limit, but Windows has 2048, so we loop 579 # 1024 times (each call leaked two fds). 580 for i in range(1024): 581 # Windows raises IOError. Others raise OSError. 582 with self.assertRaises(EnvironmentError) as c: 583 subprocess.Popen(['nonexisting_i_hope'], 584 stdout=subprocess.PIPE, 585 stderr=subprocess.PIPE) 586 # ignore errors that indicate the command was not found 587 if c.exception.errno not in (errno.ENOENT, errno.EACCES): 588 raise c.exception 589 590 def test_handles_closed_on_exception(self): 591 # If CreateProcess exits with an error, ensure the 592 # duplicate output handles are released 593 ifhandle, ifname = mkstemp() 594 ofhandle, ofname = mkstemp() 595 efhandle, efname = mkstemp() 596 try: 597 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 598 stderr=efhandle) 599 except OSError: 600 os.close(ifhandle) 601 os.remove(ifname) 602 os.close(ofhandle) 603 os.remove(ofname) 604 os.close(efhandle) 605 os.remove(efname) 606 self.assertFalse(os.path.exists(ifname)) 607 self.assertFalse(os.path.exists(ofname)) 608 self.assertFalse(os.path.exists(efname)) 609 610 def test_communicate_epipe(self): 611 # Issue 10963: communicate() should hide EPIPE 612 p = subprocess.Popen([sys.executable, "-c", 'pass'], 613 stdin=subprocess.PIPE, 614 stdout=subprocess.PIPE, 615 stderr=subprocess.PIPE) 616 self.addCleanup(p.stdout.close) 617 self.addCleanup(p.stderr.close) 618 self.addCleanup(p.stdin.close) 619 p.communicate("x" * 2**20) 620 621 def test_communicate_epipe_only_stdin(self): 622 # Issue 10963: communicate() should hide EPIPE 623 p = subprocess.Popen([sys.executable, "-c", 'pass'], 624 stdin=subprocess.PIPE) 625 self.addCleanup(p.stdin.close) 626 time.sleep(2) 627 p.communicate("x" * 2**20) 628 629# context manager 630class _SuppressCoreFiles(object): 631 """Try to prevent core files from being created.""" 632 old_limit = None 633 634 def __enter__(self): 635 """Try to save previous ulimit, then set it to (0, 0).""" 636 try: 637 import resource 638 self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) 639 resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) 640 except (ImportError, ValueError, resource.error): 641 pass 642 643 if sys.platform == 'darwin': 644 # Check if the 'Crash Reporter' on OSX was configured 645 # in 'Developer' mode and warn that it will get triggered 646 # when it is. 647 # 648 # This assumes that this context manager is used in tests 649 # that might trigger the next manager. 650 value = subprocess.Popen(['/usr/bin/defaults', 'read', 651 'com.apple.CrashReporter', 'DialogType'], 652 stdout=subprocess.PIPE).communicate()[0] 653 if value.strip() == b'developer': 654 print "this tests triggers the Crash Reporter, that is intentional" 655 sys.stdout.flush() 656 657 def __exit__(self, *args): 658 """Return core file behavior to default.""" 659 if self.old_limit is None: 660 return 661 try: 662 import resource 663 resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) 664 except (ImportError, ValueError, resource.error): 665 pass 666 667 def test_communicate_eintr(self): 668 # Issue #12493: communicate() should handle EINTR 669 def handler(signum, frame): 670 pass 671 old_handler = signal.signal(signal.SIGALRM, handler) 672 self.addCleanup(signal.signal, signal.SIGALRM, old_handler) 673 674 # the process is running for 2 seconds 675 args = [sys.executable, "-c", 'import time; time.sleep(2)'] 676 for stream in ('stdout', 'stderr'): 677 kw = {stream: subprocess.PIPE} 678 with subprocess.Popen(args, **kw) as process: 679 signal.alarm(1) 680 # communicate() will be interrupted by SIGALRM 681 process.communicate() 682 683 684@unittest.skipIf(mswindows, "POSIX specific tests") 685class POSIXProcessTestCase(BaseTestCase): 686 687 def test_exceptions(self): 688 # caught & re-raised exceptions 689 with self.assertRaises(OSError) as c: 690 p = subprocess.Popen([sys.executable, "-c", ""], 691 cwd="/this/path/does/not/exist") 692 # The attribute child_traceback should contain "os.chdir" somewhere. 693 self.assertIn("os.chdir", c.exception.child_traceback) 694 695 def test_run_abort(self): 696 # returncode handles signal termination 697 with _SuppressCoreFiles(): 698 p = subprocess.Popen([sys.executable, "-c", 699 "import os; os.abort()"]) 700 p.wait() 701 self.assertEqual(-p.returncode, signal.SIGABRT) 702 703 def test_preexec(self): 704 # preexec function 705 p = subprocess.Popen([sys.executable, "-c", 706 "import sys, os;" 707 "sys.stdout.write(os.getenv('FRUIT'))"], 708 stdout=subprocess.PIPE, 709 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 710 self.addCleanup(p.stdout.close) 711 self.assertEqual(p.stdout.read(), "apple") 712 713 def test_args_string(self): 714 # args is a string 715 f, fname = mkstemp() 716 os.write(f, "#!/bin/sh\n") 717 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 718 sys.executable) 719 os.close(f) 720 os.chmod(fname, 0o700) 721 p = subprocess.Popen(fname) 722 p.wait() 723 os.remove(fname) 724 self.assertEqual(p.returncode, 47) 725 726 def test_invalid_args(self): 727 # invalid arguments should raise ValueError 728 self.assertRaises(ValueError, subprocess.call, 729 [sys.executable, "-c", 730 "import sys; sys.exit(47)"], 731 startupinfo=47) 732 self.assertRaises(ValueError, subprocess.call, 733 [sys.executable, "-c", 734 "import sys; sys.exit(47)"], 735 creationflags=47) 736 737 def test_shell_sequence(self): 738 # Run command through the shell (sequence) 739 newenv = os.environ.copy() 740 newenv["FRUIT"] = "apple" 741 p = subprocess.Popen(["echo $FRUIT"], shell=1, 742 stdout=subprocess.PIPE, 743 env=newenv) 744 self.addCleanup(p.stdout.close) 745 self.assertEqual(p.stdout.read().strip(), "apple") 746 747 def test_shell_string(self): 748 # Run command through the shell (string) 749 newenv = os.environ.copy() 750 newenv["FRUIT"] = "apple" 751 p = subprocess.Popen("echo $FRUIT", shell=1, 752 stdout=subprocess.PIPE, 753 env=newenv) 754 self.addCleanup(p.stdout.close) 755 self.assertEqual(p.stdout.read().strip(), "apple") 756 757 def test_call_string(self): 758 # call() function with string argument on UNIX 759 f, fname = mkstemp() 760 os.write(f, "#!/bin/sh\n") 761 os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % 762 sys.executable) 763 os.close(f) 764 os.chmod(fname, 0700) 765 rc = subprocess.call(fname) 766 os.remove(fname) 767 self.assertEqual(rc, 47) 768 769 def test_specific_shell(self): 770 # Issue #9265: Incorrect name passed as arg[0]. 771 shells = [] 772 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 773 for name in ['bash', 'ksh']: 774 sh = os.path.join(prefix, name) 775 if os.path.isfile(sh): 776 shells.append(sh) 777 if not shells: # Will probably work for any shell but csh. 778 self.skipTest("bash or ksh required for this test") 779 sh = '/bin/sh' 780 if os.path.isfile(sh) and not os.path.islink(sh): 781 # Test will fail if /bin/sh is a symlink to csh. 782 shells.append(sh) 783 for sh in shells: 784 p = subprocess.Popen("echo $0", executable=sh, shell=True, 785 stdout=subprocess.PIPE) 786 self.addCleanup(p.stdout.close) 787 self.assertEqual(p.stdout.read().strip(), sh) 788 789 def _kill_process(self, method, *args): 790 # Do not inherit file handles from the parent. 791 # It should fix failures on some platforms. 792 p = subprocess.Popen([sys.executable, "-c", """if 1: 793 import sys, time 794 sys.stdout.write('x\\n') 795 sys.stdout.flush() 796 time.sleep(30) 797 """], 798 close_fds=True, 799 stdin=subprocess.PIPE, 800 stdout=subprocess.PIPE, 801 stderr=subprocess.PIPE) 802 # Wait for the interpreter to be completely initialized before 803 # sending any signal. 804 p.stdout.read(1) 805 getattr(p, method)(*args) 806 return p 807 808 def test_send_signal(self): 809 p = self._kill_process('send_signal', signal.SIGINT) 810 _, stderr = p.communicate() 811 self.assertIn('KeyboardInterrupt', stderr) 812 self.assertNotEqual(p.wait(), 0) 813 814 def test_kill(self): 815 p = self._kill_process('kill') 816 _, stderr = p.communicate() 817 self.assertStderrEqual(stderr, '') 818 self.assertEqual(p.wait(), -signal.SIGKILL) 819 820 def test_terminate(self): 821 p = self._kill_process('terminate') 822 _, stderr = p.communicate() 823 self.assertStderrEqual(stderr, '') 824 self.assertEqual(p.wait(), -signal.SIGTERM) 825 826 def check_close_std_fds(self, fds): 827 # Issue #9905: test that subprocess pipes still work properly with 828 # some standard fds closed 829 stdin = 0 830 newfds = [] 831 for a in fds: 832 b = os.dup(a) 833 newfds.append(b) 834 if a == 0: 835 stdin = b 836 try: 837 for fd in fds: 838 os.close(fd) 839 out, err = subprocess.Popen([sys.executable, "-c", 840 'import sys;' 841 'sys.stdout.write("apple");' 842 'sys.stdout.flush();' 843 'sys.stderr.write("orange")'], 844 stdin=stdin, 845 stdout=subprocess.PIPE, 846 stderr=subprocess.PIPE).communicate() 847 err = test_support.strip_python_stderr(err) 848 self.assertEqual((out, err), (b'apple', b'orange')) 849 finally: 850 for b, a in zip(newfds, fds): 851 os.dup2(b, a) 852 for b in newfds: 853 os.close(b) 854 855 def test_close_fd_0(self): 856 self.check_close_std_fds([0]) 857 858 def test_close_fd_1(self): 859 self.check_close_std_fds([1]) 860 861 def test_close_fd_2(self): 862 self.check_close_std_fds([2]) 863 864 def test_close_fds_0_1(self): 865 self.check_close_std_fds([0, 1]) 866 867 def test_close_fds_0_2(self): 868 self.check_close_std_fds([0, 2]) 869 870 def test_close_fds_1_2(self): 871 self.check_close_std_fds([1, 2]) 872 873 def test_close_fds_0_1_2(self): 874 # Issue #10806: test that subprocess pipes still work properly with 875 # all standard fds closed. 876 self.check_close_std_fds([0, 1, 2]) 877 878 def test_wait_when_sigchild_ignored(self): 879 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 880 sigchild_ignore = test_support.findfile("sigchild_ignore.py", 881 subdir="subprocessdata") 882 p = subprocess.Popen([sys.executable, sigchild_ignore], 883 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 884 stdout, stderr = p.communicate() 885 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 886 " non-zero with this error:\n%s" % stderr) 887 888 889@unittest.skipUnless(mswindows, "Windows specific tests") 890class Win32ProcessTestCase(BaseTestCase): 891 892 def test_startupinfo(self): 893 # startupinfo argument 894 # We uses hardcoded constants, because we do not want to 895 # depend on win32all. 896 STARTF_USESHOWWINDOW = 1 897 SW_MAXIMIZE = 3 898 startupinfo = subprocess.STARTUPINFO() 899 startupinfo.dwFlags = STARTF_USESHOWWINDOW 900 startupinfo.wShowWindow = SW_MAXIMIZE 901 # Since Python is a console process, it won't be affected 902 # by wShowWindow, but the argument should be silently 903 # ignored 904 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"], 905 startupinfo=startupinfo) 906 907 def test_creationflags(self): 908 # creationflags argument 909 CREATE_NEW_CONSOLE = 16 910 sys.stderr.write(" a DOS box should flash briefly ...\n") 911 subprocess.call(sys.executable + 912 ' -c "import time; time.sleep(0.25)"', 913 creationflags=CREATE_NEW_CONSOLE) 914 915 def test_invalid_args(self): 916 # invalid arguments should raise ValueError 917 self.assertRaises(ValueError, subprocess.call, 918 [sys.executable, "-c", 919 "import sys; sys.exit(47)"], 920 preexec_fn=lambda: 1) 921 self.assertRaises(ValueError, subprocess.call, 922 [sys.executable, "-c", 923 "import sys; sys.exit(47)"], 924 stdout=subprocess.PIPE, 925 close_fds=True) 926 927 def test_close_fds(self): 928 # close file descriptors 929 rc = subprocess.call([sys.executable, "-c", 930 "import sys; sys.exit(47)"], 931 close_fds=True) 932 self.assertEqual(rc, 47) 933 934 def test_shell_sequence(self): 935 # Run command through the shell (sequence) 936 newenv = os.environ.copy() 937 newenv["FRUIT"] = "physalis" 938 p = subprocess.Popen(["set"], shell=1, 939 stdout=subprocess.PIPE, 940 env=newenv) 941 self.addCleanup(p.stdout.close) 942 self.assertIn("physalis", p.stdout.read()) 943 944 def test_shell_string(self): 945 # Run command through the shell (string) 946 newenv = os.environ.copy() 947 newenv["FRUIT"] = "physalis" 948 p = subprocess.Popen("set", shell=1, 949 stdout=subprocess.PIPE, 950 env=newenv) 951 self.addCleanup(p.stdout.close) 952 self.assertIn("physalis", p.stdout.read()) 953 954 def test_call_string(self): 955 # call() function with string argument on Windows 956 rc = subprocess.call(sys.executable + 957 ' -c "import sys; sys.exit(47)"') 958 self.assertEqual(rc, 47) 959 960 def _kill_process(self, method, *args): 961 # Some win32 buildbot raises EOFError if stdin is inherited 962 p = subprocess.Popen([sys.executable, "-c", """if 1: 963 import sys, time 964 sys.stdout.write('x\\n') 965 sys.stdout.flush() 966 time.sleep(30) 967 """], 968 stdin=subprocess.PIPE, 969 stdout=subprocess.PIPE, 970 stderr=subprocess.PIPE) 971 self.addCleanup(p.stdout.close) 972 self.addCleanup(p.stderr.close) 973 self.addCleanup(p.stdin.close) 974 # Wait for the interpreter to be completely initialized before 975 # sending any signal. 976 p.stdout.read(1) 977 getattr(p, method)(*args) 978 _, stderr = p.communicate() 979 self.assertStderrEqual(stderr, '') 980 returncode = p.wait() 981 self.assertNotEqual(returncode, 0) 982 983 def test_send_signal(self): 984 self._kill_process('send_signal', signal.SIGTERM) 985 986 def test_kill(self): 987 self._kill_process('kill') 988 989 def test_terminate(self): 990 self._kill_process('terminate') 991 992 993@unittest.skipUnless(getattr(subprocess, '_has_poll', False), 994 "poll system call not supported") 995class ProcessTestCaseNoPoll(ProcessTestCase): 996 def setUp(self): 997 subprocess._has_poll = False 998 ProcessTestCase.setUp(self) 999 1000 def tearDown(self): 1001 subprocess._has_poll = True 1002 ProcessTestCase.tearDown(self) 1003 1004 1005class HelperFunctionTests(unittest.TestCase): 1006 @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") 1007 def test_eintr_retry_call(self): 1008 record_calls = [] 1009 def fake_os_func(*args): 1010 record_calls.append(args) 1011 if len(record_calls) == 2: 1012 raise OSError(errno.EINTR, "fake interrupted system call") 1013 return tuple(reversed(args)) 1014 1015 self.assertEqual((999, 256), 1016 subprocess._eintr_retry_call(fake_os_func, 256, 999)) 1017 self.assertEqual([(256, 999)], record_calls) 1018 # This time there will be an EINTR so it will loop once. 1019 self.assertEqual((666,), 1020 subprocess._eintr_retry_call(fake_os_func, 666)) 1021 self.assertEqual([(256, 999), (666,), (666,)], record_calls) 1022 1023 1024@unittest.skipUnless(mswindows, "mswindows only") 1025class CommandsWithSpaces (BaseTestCase): 1026 1027 def setUp(self): 1028 super(CommandsWithSpaces, self).setUp() 1029 f, fname = mkstemp(".py", "te st") 1030 self.fname = fname.lower () 1031 os.write(f, b"import sys;" 1032 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 1033 ) 1034 os.close(f) 1035 1036 def tearDown(self): 1037 os.remove(self.fname) 1038 super(CommandsWithSpaces, self).tearDown() 1039 1040 def with_spaces(self, *args, **kwargs): 1041 kwargs['stdout'] = subprocess.PIPE 1042 p = subprocess.Popen(*args, **kwargs) 1043 self.addCleanup(p.stdout.close) 1044 self.assertEqual( 1045 p.stdout.read ().decode("mbcs"), 1046 "2 [%r, 'ab cd']" % self.fname 1047 ) 1048 1049 def test_shell_string_with_spaces(self): 1050 # call() function with string argument with spaces on Windows 1051 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1052 "ab cd"), shell=1) 1053 1054 def test_shell_sequence_with_spaces(self): 1055 # call() function with sequence argument with spaces on Windows 1056 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 1057 1058 def test_noshell_string_with_spaces(self): 1059 # call() function with string argument with spaces on Windows 1060 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 1061 "ab cd")) 1062 1063 def test_noshell_sequence_with_spaces(self): 1064 # call() function with sequence argument with spaces on Windows 1065 self.with_spaces([sys.executable, self.fname, "ab cd"]) 1066 1067def test_main(): 1068 unit_tests = (ProcessTestCase, 1069 POSIXProcessTestCase, 1070 Win32ProcessTestCase, 1071 ProcessTestCaseNoPoll, 1072 HelperFunctionTests, 1073 CommandsWithSpaces) 1074 1075 test_support.run_unittest(*unit_tests) 1076 test_support.reap_children() 1077 1078if __name__ == "__main__": 1079 test_main() 1080