1import unittest
2from test import test_support
3import subprocess
4import sys
5import signal
6import os
7import errno
8import tempfile
9import time
10import re
11import sysconfig
12
13try:
14    import resource
15except ImportError:
16    resource = None
17
18mswindows = (sys.platform == "win32")
19
20#
21# Depends on the following external programs: Python
22#
23
24if mswindows:
25    SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
26                                                'os.O_BINARY);')
27else:
28    SETBINARY = ''
29
30
31try:
32    mkstemp = tempfile.mkstemp
33except AttributeError:
34    # tempfile.mkstemp is not available
35    def mkstemp():
36        """Replacement for mkstemp, calling mktemp."""
37        fname = tempfile.mktemp()
38        return os.open(fname, os.O_RDWR|os.O_CREAT), fname
39
40
41class BaseTestCase(unittest.TestCase):
42    def setUp(self):
43        # Try to minimize the number of children we have so this test
44        # doesn't crash on some buildbots (Alphas in particular).
45        test_support.reap_children()
46
47    def tearDown(self):
48        for inst in subprocess._active:
49            inst.wait()
50        subprocess._cleanup()
51        self.assertFalse(subprocess._active, "subprocess._active not empty")
52
53    def assertStderrEqual(self, stderr, expected, msg=None):
54        # In a debug build, stuff like "[6580 refs]" is printed to stderr at
55        # shutdown time.  That frustrates tests trying to check stderr produced
56        # from a spawned Python process.
57        actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
58        self.assertEqual(actual, expected, msg)
59
60
61class PopenTestException(Exception):
62    pass
63
64
65class PopenExecuteChildRaises(subprocess.Popen):
66    """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
67    _execute_child fails.
68    """
69    def _execute_child(self, *args, **kwargs):
70        raise PopenTestException("Forced Exception for Test")
71
72
73class ProcessTestCase(BaseTestCase):
74
75    def test_call_seq(self):
76        # call() function with sequence argument
77        rc = subprocess.call([sys.executable, "-c",
78                              "import sys; sys.exit(47)"])
79        self.assertEqual(rc, 47)
80
81    def test_check_call_zero(self):
82        # check_call() function with zero return code
83        rc = subprocess.check_call([sys.executable, "-c",
84                                    "import sys; sys.exit(0)"])
85        self.assertEqual(rc, 0)
86
87    def test_check_call_nonzero(self):
88        # check_call() function with non-zero return code
89        with self.assertRaises(subprocess.CalledProcessError) as c:
90            subprocess.check_call([sys.executable, "-c",
91                                   "import sys; sys.exit(47)"])
92        self.assertEqual(c.exception.returncode, 47)
93
94    def test_check_output(self):
95        # check_output() function with zero return code
96        output = subprocess.check_output(
97                [sys.executable, "-c", "print 'BDFL'"])
98        self.assertIn('BDFL', output)
99
100    def test_check_output_nonzero(self):
101        # check_call() function with non-zero return code
102        with self.assertRaises(subprocess.CalledProcessError) as c:
103            subprocess.check_output(
104                    [sys.executable, "-c", "import sys; sys.exit(5)"])
105        self.assertEqual(c.exception.returncode, 5)
106
107    def test_check_output_stderr(self):
108        # check_output() function stderr redirected to stdout
109        output = subprocess.check_output(
110                [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
111                stderr=subprocess.STDOUT)
112        self.assertIn('BDFL', output)
113
114    def test_check_output_stdout_arg(self):
115        # check_output() function stderr redirected to stdout
116        with self.assertRaises(ValueError) as c:
117            output = subprocess.check_output(
118                    [sys.executable, "-c", "print 'will not be run'"],
119                    stdout=sys.stdout)
120            self.fail("Expected ValueError when stdout arg supplied.")
121        self.assertIn('stdout', c.exception.args[0])
122
123    def test_call_kwargs(self):
124        # call() function with keyword args
125        newenv = os.environ.copy()
126        newenv["FRUIT"] = "banana"
127        rc = subprocess.call([sys.executable, "-c",
128                              'import sys, os;'
129                              'sys.exit(os.getenv("FRUIT")=="banana")'],
130                             env=newenv)
131        self.assertEqual(rc, 1)
132
133    def test_invalid_args(self):
134        # Popen() called with invalid arguments should raise TypeError
135        # but Popen.__del__ should not complain (issue #12085)
136        with test_support.captured_stderr() as s:
137            self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
138            argcount = subprocess.Popen.__init__.__code__.co_argcount
139            too_many_args = [0] * (argcount + 1)
140            self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
141        self.assertEqual(s.getvalue(), '')
142
143    def test_stdin_none(self):
144        # .stdin is None when not redirected
145        p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
146                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
147        self.addCleanup(p.stdout.close)
148        self.addCleanup(p.stderr.close)
149        p.wait()
150        self.assertEqual(p.stdin, None)
151
152    def test_stdout_none(self):
153        # .stdout is None when not redirected, and the child's stdout will
154        # be inherited from the parent.  In order to test this we run a
155        # subprocess in a subprocess:
156        # this_test
157        #   \-- subprocess created by this test (parent)
158        #          \-- subprocess created by the parent subprocess (child)
159        # The parent doesn't specify stdout, so the child will use the
160        # parent's stdout.  This test checks that the message printed by the
161        # child goes to the parent stdout.  The parent also checks that the
162        # child's stdout is None.  See #11963.
163        code = ('import sys; from subprocess import Popen, PIPE;'
164                'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],'
165                '          stdin=PIPE, stderr=PIPE);'
166                'p.wait(); assert p.stdout is None;')
167        p = subprocess.Popen([sys.executable, "-c", code],
168                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
169        self.addCleanup(p.stdout.close)
170        self.addCleanup(p.stderr.close)
171        out, err = p.communicate()
172        self.assertEqual(p.returncode, 0, err)
173        self.assertEqual(out.rstrip(), 'test_stdout_none')
174
175    def test_stderr_none(self):
176        # .stderr is None when not redirected
177        p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
178                         stdin=subprocess.PIPE, stdout=subprocess.PIPE)
179        self.addCleanup(p.stdout.close)
180        self.addCleanup(p.stdin.close)
181        p.wait()
182        self.assertEqual(p.stderr, None)
183
184    def test_executable_with_cwd(self):
185        python_dir = os.path.dirname(os.path.realpath(sys.executable))
186        p = subprocess.Popen(["somethingyoudonthave", "-c",
187                              "import sys; sys.exit(47)"],
188                             executable=sys.executable, cwd=python_dir)
189        p.wait()
190        self.assertEqual(p.returncode, 47)
191
192    @unittest.skipIf(sysconfig.is_python_build(),
193                     "need an installed Python. See #7774")
194    def test_executable_without_cwd(self):
195        # For a normal installation, it should work without 'cwd'
196        # argument.  For test runs in the build directory, see #7774.
197        p = subprocess.Popen(["somethingyoudonthave", "-c",
198                              "import sys; sys.exit(47)"],
199                             executable=sys.executable)
200        p.wait()
201        self.assertEqual(p.returncode, 47)
202
203    def test_stdin_pipe(self):
204        # stdin redirection
205        p = subprocess.Popen([sys.executable, "-c",
206                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
207                        stdin=subprocess.PIPE)
208        p.stdin.write("pear")
209        p.stdin.close()
210        p.wait()
211        self.assertEqual(p.returncode, 1)
212
213    def test_stdin_filedes(self):
214        # stdin is set to open file descriptor
215        tf = tempfile.TemporaryFile()
216        d = tf.fileno()
217        os.write(d, "pear")
218        os.lseek(d, 0, 0)
219        p = subprocess.Popen([sys.executable, "-c",
220                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
221                         stdin=d)
222        p.wait()
223        self.assertEqual(p.returncode, 1)
224
225    def test_stdin_fileobj(self):
226        # stdin is set to open file object
227        tf = tempfile.TemporaryFile()
228        tf.write("pear")
229        tf.seek(0)
230        p = subprocess.Popen([sys.executable, "-c",
231                         'import sys; sys.exit(sys.stdin.read() == "pear")'],
232                         stdin=tf)
233        p.wait()
234        self.assertEqual(p.returncode, 1)
235
236    def test_stdout_pipe(self):
237        # stdout redirection
238        p = subprocess.Popen([sys.executable, "-c",
239                          'import sys; sys.stdout.write("orange")'],
240                         stdout=subprocess.PIPE)
241        self.addCleanup(p.stdout.close)
242        self.assertEqual(p.stdout.read(), "orange")
243
244    def test_stdout_filedes(self):
245        # stdout is set to open file descriptor
246        tf = tempfile.TemporaryFile()
247        d = tf.fileno()
248        p = subprocess.Popen([sys.executable, "-c",
249                          'import sys; sys.stdout.write("orange")'],
250                         stdout=d)
251        p.wait()
252        os.lseek(d, 0, 0)
253        self.assertEqual(os.read(d, 1024), "orange")
254
255    def test_stdout_fileobj(self):
256        # stdout is set to open file object
257        tf = tempfile.TemporaryFile()
258        p = subprocess.Popen([sys.executable, "-c",
259                          'import sys; sys.stdout.write("orange")'],
260                         stdout=tf)
261        p.wait()
262        tf.seek(0)
263        self.assertEqual(tf.read(), "orange")
264
265    def test_stderr_pipe(self):
266        # stderr redirection
267        p = subprocess.Popen([sys.executable, "-c",
268                          'import sys; sys.stderr.write("strawberry")'],
269                         stderr=subprocess.PIPE)
270        self.addCleanup(p.stderr.close)
271        self.assertStderrEqual(p.stderr.read(), "strawberry")
272
273    def test_stderr_filedes(self):
274        # stderr is set to open file descriptor
275        tf = tempfile.TemporaryFile()
276        d = tf.fileno()
277        p = subprocess.Popen([sys.executable, "-c",
278                          'import sys; sys.stderr.write("strawberry")'],
279                         stderr=d)
280        p.wait()
281        os.lseek(d, 0, 0)
282        self.assertStderrEqual(os.read(d, 1024), "strawberry")
283
284    def test_stderr_fileobj(self):
285        # stderr is set to open file object
286        tf = tempfile.TemporaryFile()
287        p = subprocess.Popen([sys.executable, "-c",
288                          'import sys; sys.stderr.write("strawberry")'],
289                         stderr=tf)
290        p.wait()
291        tf.seek(0)
292        self.assertStderrEqual(tf.read(), "strawberry")
293
294    def test_stdout_stderr_pipe(self):
295        # capture stdout and stderr to the same pipe
296        p = subprocess.Popen([sys.executable, "-c",
297                          'import sys;'
298                          'sys.stdout.write("apple");'
299                          'sys.stdout.flush();'
300                          'sys.stderr.write("orange")'],
301                         stdout=subprocess.PIPE,
302                         stderr=subprocess.STDOUT)
303        self.addCleanup(p.stdout.close)
304        self.assertStderrEqual(p.stdout.read(), "appleorange")
305
306    def test_stdout_stderr_file(self):
307        # capture stdout and stderr to the same open file
308        tf = tempfile.TemporaryFile()
309        p = subprocess.Popen([sys.executable, "-c",
310                          'import sys;'
311                          'sys.stdout.write("apple");'
312                          'sys.stdout.flush();'
313                          'sys.stderr.write("orange")'],
314                         stdout=tf,
315                         stderr=tf)
316        p.wait()
317        tf.seek(0)
318        self.assertStderrEqual(tf.read(), "appleorange")
319
320    def test_stdout_filedes_of_stdout(self):
321        # stdout is set to 1 (#1531862).
322        # To avoid printing the text on stdout, we do something similar to
323        # test_stdout_none (see above).  The parent subprocess calls the child
324        # subprocess passing stdout=1, and this test uses stdout=PIPE in
325        # order to capture and check the output of the parent. See #11963.
326        code = ('import sys, subprocess; '
327                'rc = subprocess.call([sys.executable, "-c", '
328                '    "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
329                     '\'test with stdout=1\'))"], stdout=1); '
330                'assert rc == 18')
331        p = subprocess.Popen([sys.executable, "-c", code],
332                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
333        self.addCleanup(p.stdout.close)
334        self.addCleanup(p.stderr.close)
335        out, err = p.communicate()
336        self.assertEqual(p.returncode, 0, err)
337        self.assertEqual(out.rstrip(), 'test with stdout=1')
338
339    def test_cwd(self):
340        tmpdir = tempfile.gettempdir()
341        # We cannot use os.path.realpath to canonicalize the path,
342        # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
343        cwd = os.getcwd()
344        os.chdir(tmpdir)
345        tmpdir = os.getcwd()
346        os.chdir(cwd)
347        p = subprocess.Popen([sys.executable, "-c",
348                          'import sys,os;'
349                          'sys.stdout.write(os.getcwd())'],
350                         stdout=subprocess.PIPE,
351                         cwd=tmpdir)
352        self.addCleanup(p.stdout.close)
353        normcase = os.path.normcase
354        self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
355
356    def test_env(self):
357        newenv = os.environ.copy()
358        newenv["FRUIT"] = "orange"
359        p = subprocess.Popen([sys.executable, "-c",
360                          'import sys,os;'
361                          'sys.stdout.write(os.getenv("FRUIT"))'],
362                         stdout=subprocess.PIPE,
363                         env=newenv)
364        self.addCleanup(p.stdout.close)
365        self.assertEqual(p.stdout.read(), "orange")
366
367    def test_communicate_stdin(self):
368        p = subprocess.Popen([sys.executable, "-c",
369                              'import sys;'
370                              'sys.exit(sys.stdin.read() == "pear")'],
371                             stdin=subprocess.PIPE)
372        p.communicate("pear")
373        self.assertEqual(p.returncode, 1)
374
375    def test_communicate_stdout(self):
376        p = subprocess.Popen([sys.executable, "-c",
377                              'import sys; sys.stdout.write("pineapple")'],
378                             stdout=subprocess.PIPE)
379        (stdout, stderr) = p.communicate()
380        self.assertEqual(stdout, "pineapple")
381        self.assertEqual(stderr, None)
382
383    def test_communicate_stderr(self):
384        p = subprocess.Popen([sys.executable, "-c",
385                              'import sys; sys.stderr.write("pineapple")'],
386                             stderr=subprocess.PIPE)
387        (stdout, stderr) = p.communicate()
388        self.assertEqual(stdout, None)
389        self.assertStderrEqual(stderr, "pineapple")
390
391    def test_communicate(self):
392        p = subprocess.Popen([sys.executable, "-c",
393                          'import sys,os;'
394                          'sys.stderr.write("pineapple");'
395                          'sys.stdout.write(sys.stdin.read())'],
396                         stdin=subprocess.PIPE,
397                         stdout=subprocess.PIPE,
398                         stderr=subprocess.PIPE)
399        self.addCleanup(p.stdout.close)
400        self.addCleanup(p.stderr.close)
401        self.addCleanup(p.stdin.close)
402        (stdout, stderr) = p.communicate("banana")
403        self.assertEqual(stdout, "banana")
404        self.assertStderrEqual(stderr, "pineapple")
405
406    # This test is Linux specific for simplicity to at least have
407    # some coverage.  It is not a platform specific bug.
408    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
409                         "Linux specific")
410    # Test for the fd leak reported in http://bugs.python.org/issue2791.
411    def test_communicate_pipe_fd_leak(self):
412        fd_directory = '/proc/%d/fd' % os.getpid()
413        num_fds_before_popen = len(os.listdir(fd_directory))
414        p = subprocess.Popen([sys.executable, "-c", "print()"],
415                             stdout=subprocess.PIPE)
416        p.communicate()
417        num_fds_after_communicate = len(os.listdir(fd_directory))
418        del p
419        num_fds_after_destruction = len(os.listdir(fd_directory))
420        self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
421        self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
422
423    def test_communicate_returns(self):
424        # communicate() should return None if no redirection is active
425        p = subprocess.Popen([sys.executable, "-c",
426                              "import sys; sys.exit(47)"])
427        (stdout, stderr) = p.communicate()
428        self.assertEqual(stdout, None)
429        self.assertEqual(stderr, None)
430
431    def test_communicate_pipe_buf(self):
432        # communicate() with writes larger than pipe_buf
433        # This test will probably deadlock rather than fail, if
434        # communicate() does not work properly.
435        x, y = os.pipe()
436        if mswindows:
437            pipe_buf = 512
438        else:
439            pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
440        os.close(x)
441        os.close(y)
442        p = subprocess.Popen([sys.executable, "-c",
443                          'import sys,os;'
444                          'sys.stdout.write(sys.stdin.read(47));'
445                          'sys.stderr.write("xyz"*%d);'
446                          'sys.stdout.write(sys.stdin.read())' % pipe_buf],
447                         stdin=subprocess.PIPE,
448                         stdout=subprocess.PIPE,
449                         stderr=subprocess.PIPE)
450        self.addCleanup(p.stdout.close)
451        self.addCleanup(p.stderr.close)
452        self.addCleanup(p.stdin.close)
453        string_to_write = "abc"*pipe_buf
454        (stdout, stderr) = p.communicate(string_to_write)
455        self.assertEqual(stdout, string_to_write)
456
457    def test_writes_before_communicate(self):
458        # stdin.write before communicate()
459        p = subprocess.Popen([sys.executable, "-c",
460                          'import sys,os;'
461                          'sys.stdout.write(sys.stdin.read())'],
462                         stdin=subprocess.PIPE,
463                         stdout=subprocess.PIPE,
464                         stderr=subprocess.PIPE)
465        self.addCleanup(p.stdout.close)
466        self.addCleanup(p.stderr.close)
467        self.addCleanup(p.stdin.close)
468        p.stdin.write("banana")
469        (stdout, stderr) = p.communicate("split")
470        self.assertEqual(stdout, "bananasplit")
471        self.assertStderrEqual(stderr, "")
472
473    def test_universal_newlines(self):
474        p = subprocess.Popen([sys.executable, "-c",
475                          'import sys,os;' + SETBINARY +
476                          'sys.stdout.write("line1\\n");'
477                          'sys.stdout.flush();'
478                          'sys.stdout.write("line2\\r");'
479                          'sys.stdout.flush();'
480                          'sys.stdout.write("line3\\r\\n");'
481                          'sys.stdout.flush();'
482                          'sys.stdout.write("line4\\r");'
483                          'sys.stdout.flush();'
484                          'sys.stdout.write("\\nline5");'
485                          'sys.stdout.flush();'
486                          'sys.stdout.write("\\nline6");'],
487                         stdout=subprocess.PIPE,
488                         universal_newlines=1)
489        self.addCleanup(p.stdout.close)
490        stdout = p.stdout.read()
491        if hasattr(file, 'newlines'):
492            # Interpreter with universal newline support
493            self.assertEqual(stdout,
494                             "line1\nline2\nline3\nline4\nline5\nline6")
495        else:
496            # Interpreter without universal newline support
497            self.assertEqual(stdout,
498                             "line1\nline2\rline3\r\nline4\r\nline5\nline6")
499
500    def test_universal_newlines_communicate(self):
501        # universal newlines through communicate()
502        p = subprocess.Popen([sys.executable, "-c",
503                          'import sys,os;' + SETBINARY +
504                          'sys.stdout.write("line1\\n");'
505                          'sys.stdout.flush();'
506                          'sys.stdout.write("line2\\r");'
507                          'sys.stdout.flush();'
508                          'sys.stdout.write("line3\\r\\n");'
509                          'sys.stdout.flush();'
510                          'sys.stdout.write("line4\\r");'
511                          'sys.stdout.flush();'
512                          'sys.stdout.write("\\nline5");'
513                          'sys.stdout.flush();'
514                          'sys.stdout.write("\\nline6");'],
515                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
516                         universal_newlines=1)
517        self.addCleanup(p.stdout.close)
518        self.addCleanup(p.stderr.close)
519        (stdout, stderr) = p.communicate()
520        if hasattr(file, 'newlines'):
521            # Interpreter with universal newline support
522            self.assertEqual(stdout,
523                             "line1\nline2\nline3\nline4\nline5\nline6")
524        else:
525            # Interpreter without universal newline support
526            self.assertEqual(stdout,
527                             "line1\nline2\rline3\r\nline4\r\nline5\nline6")
528
529    def test_no_leaking(self):
530        # Make sure we leak no resources
531        if not mswindows:
532            max_handles = 1026 # too much for most UNIX systems
533        else:
534            max_handles = 2050 # too much for (at least some) Windows setups
535        handles = []
536        try:
537            for i in range(max_handles):
538                try:
539                    handles.append(os.open(test_support.TESTFN,
540                                           os.O_WRONLY | os.O_CREAT))
541                except OSError as e:
542                    if e.errno != errno.EMFILE:
543                        raise
544                    break
545            else:
546                self.skipTest("failed to reach the file descriptor limit "
547                    "(tried %d)" % max_handles)
548            # Close a couple of them (should be enough for a subprocess)
549            for i in range(10):
550                os.close(handles.pop())
551            # Loop creating some subprocesses. If one of them leaks some fds,
552            # the next loop iteration will fail by reaching the max fd limit.
553            for i in range(15):
554                p = subprocess.Popen([sys.executable, "-c",
555                                      "import sys;"
556                                      "sys.stdout.write(sys.stdin.read())"],
557                                     stdin=subprocess.PIPE,
558                                     stdout=subprocess.PIPE,
559                                     stderr=subprocess.PIPE)
560                data = p.communicate(b"lime")[0]
561                self.assertEqual(data, b"lime")
562        finally:
563            for h in handles:
564                os.close(h)
565            test_support.unlink(test_support.TESTFN)
566
567    def test_list2cmdline(self):
568        self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
569                         '"a b c" d e')
570        self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
571                         'ab\\"c \\ d')
572        self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
573                         'ab\\"c " \\\\" d')
574        self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
575                         'a\\\\\\b "de fg" h')
576        self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
577                         'a\\\\\\"b c d')
578        self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
579                         '"a\\\\b c" d e')
580        self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
581                         '"a\\\\b\\ c" d e')
582        self.assertEqual(subprocess.list2cmdline(['ab', '']),
583                         'ab ""')
584
585
586    def test_poll(self):
587        p = subprocess.Popen([sys.executable,
588                          "-c", "import time; time.sleep(1)"])
589        count = 0
590        while p.poll() is None:
591            time.sleep(0.1)
592            count += 1
593        # We expect that the poll loop probably went around about 10 times,
594        # but, based on system scheduling we can't control, it's possible
595        # poll() never returned None.  It "should be" very rare that it
596        # didn't go around at least twice.
597        self.assertGreaterEqual(count, 2)
598        # Subsequent invocations should just return the returncode
599        self.assertEqual(p.poll(), 0)
600
601
602    def test_wait(self):
603        p = subprocess.Popen([sys.executable,
604                          "-c", "import time; time.sleep(2)"])
605        self.assertEqual(p.wait(), 0)
606        # Subsequent invocations should just return the returncode
607        self.assertEqual(p.wait(), 0)
608
609
610    def test_invalid_bufsize(self):
611        # an invalid type of the bufsize argument should raise
612        # TypeError.
613        with self.assertRaises(TypeError):
614            subprocess.Popen([sys.executable, "-c", "pass"], "orange")
615
616    def test_leaking_fds_on_error(self):
617        # see bug #5179: Popen leaks file descriptors to PIPEs if
618        # the child fails to execute; this will eventually exhaust
619        # the maximum number of open fds. 1024 seems a very common
620        # value for that limit, but Windows has 2048, so we loop
621        # 1024 times (each call leaked two fds).
622        for i in range(1024):
623            # Windows raises IOError.  Others raise OSError.
624            with self.assertRaises(EnvironmentError) as c:
625                subprocess.Popen(['nonexisting_i_hope'],
626                                 stdout=subprocess.PIPE,
627                                 stderr=subprocess.PIPE)
628            # ignore errors that indicate the command was not found
629            if c.exception.errno not in (errno.ENOENT, errno.EACCES):
630                raise c.exception
631
632    def test_handles_closed_on_exception(self):
633        # If CreateProcess exits with an error, ensure the
634        # duplicate output handles are released
635        ifhandle, ifname = mkstemp()
636        ofhandle, ofname = mkstemp()
637        efhandle, efname = mkstemp()
638        try:
639            subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
640              stderr=efhandle)
641        except OSError:
642            os.close(ifhandle)
643            os.remove(ifname)
644            os.close(ofhandle)
645            os.remove(ofname)
646            os.close(efhandle)
647            os.remove(efname)
648        self.assertFalse(os.path.exists(ifname))
649        self.assertFalse(os.path.exists(ofname))
650        self.assertFalse(os.path.exists(efname))
651
652    def test_communicate_epipe(self):
653        # Issue 10963: communicate() should hide EPIPE
654        p = subprocess.Popen([sys.executable, "-c", 'pass'],
655                             stdin=subprocess.PIPE,
656                             stdout=subprocess.PIPE,
657                             stderr=subprocess.PIPE)
658        self.addCleanup(p.stdout.close)
659        self.addCleanup(p.stderr.close)
660        self.addCleanup(p.stdin.close)
661        p.communicate("x" * 2**20)
662
663    def test_communicate_epipe_only_stdin(self):
664        # Issue 10963: communicate() should hide EPIPE
665        p = subprocess.Popen([sys.executable, "-c", 'pass'],
666                             stdin=subprocess.PIPE)
667        self.addCleanup(p.stdin.close)
668        time.sleep(2)
669        p.communicate("x" * 2**20)
670
671    # This test is Linux-ish specific for simplicity to at least have
672    # some coverage.  It is not a platform specific bug.
673    @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
674                         "Linux specific")
675    def test_failed_child_execute_fd_leak(self):
676        """Test for the fork() failure fd leak reported in issue16327."""
677        fd_directory = '/proc/%d/fd' % os.getpid()
678        fds_before_popen = os.listdir(fd_directory)
679        with self.assertRaises(PopenTestException):
680            PopenExecuteChildRaises(
681                    [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
682                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
683
684        # NOTE: This test doesn't verify that the real _execute_child
685        # does not close the file descriptors itself on the way out
686        # during an exception.  Code inspection has confirmed that.
687
688        fds_after_exception = os.listdir(fd_directory)
689        self.assertEqual(fds_before_popen, fds_after_exception)
690
691
692# context manager
693class _SuppressCoreFiles(object):
694    """Try to prevent core files from being created."""
695    old_limit = None
696
697    def __enter__(self):
698        """Try to save previous ulimit, then set it to (0, 0)."""
699        if resource is not None:
700            try:
701                self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
702                resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
703            except (ValueError, resource.error):
704                pass
705
706        if sys.platform == 'darwin':
707            # Check if the 'Crash Reporter' on OSX was configured
708            # in 'Developer' mode and warn that it will get triggered
709            # when it is.
710            #
711            # This assumes that this context manager is used in tests
712            # that might trigger the next manager.
713            value = subprocess.Popen(['/usr/bin/defaults', 'read',
714                    'com.apple.CrashReporter', 'DialogType'],
715                    stdout=subprocess.PIPE).communicate()[0]
716            if value.strip() == b'developer':
717                print "this tests triggers the Crash Reporter, that is intentional"
718                sys.stdout.flush()
719
720    def __exit__(self, *args):
721        """Return core file behavior to default."""
722        if self.old_limit is None:
723            return
724        if resource is not None:
725            try:
726                resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
727            except (ValueError, resource.error):
728                pass
729
730    @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
731                         "Requires signal.SIGALRM")
732    def test_communicate_eintr(self):
733        # Issue #12493: communicate() should handle EINTR
734        def handler(signum, frame):
735            pass
736        old_handler = signal.signal(signal.SIGALRM, handler)
737        self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
738
739        # the process is running for 2 seconds
740        args = [sys.executable, "-c", 'import time; time.sleep(2)']
741        for stream in ('stdout', 'stderr'):
742            kw = {stream: subprocess.PIPE}
743            with subprocess.Popen(args, **kw) as process:
744                signal.alarm(1)
745                # communicate() will be interrupted by SIGALRM
746                process.communicate()
747
748
749@unittest.skipIf(mswindows, "POSIX specific tests")
750class POSIXProcessTestCase(BaseTestCase):
751
752    def test_exceptions(self):
753        # caught & re-raised exceptions
754        with self.assertRaises(OSError) as c:
755            p = subprocess.Popen([sys.executable, "-c", ""],
756                                 cwd="/this/path/does/not/exist")
757        # The attribute child_traceback should contain "os.chdir" somewhere.
758        self.assertIn("os.chdir", c.exception.child_traceback)
759
760    def test_run_abort(self):
761        # returncode handles signal termination
762        with _SuppressCoreFiles():
763            p = subprocess.Popen([sys.executable, "-c",
764                                  "import os; os.abort()"])
765            p.wait()
766        self.assertEqual(-p.returncode, signal.SIGABRT)
767
768    def test_preexec(self):
769        # preexec function
770        p = subprocess.Popen([sys.executable, "-c",
771                              "import sys, os;"
772                              "sys.stdout.write(os.getenv('FRUIT'))"],
773                             stdout=subprocess.PIPE,
774                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
775        self.addCleanup(p.stdout.close)
776        self.assertEqual(p.stdout.read(), "apple")
777
778    class _TestExecuteChildPopen(subprocess.Popen):
779        """Used to test behavior at the end of _execute_child."""
780        def __init__(self, testcase, *args, **kwargs):
781            self._testcase = testcase
782            subprocess.Popen.__init__(self, *args, **kwargs)
783
784        def _execute_child(
785                self, args, executable, preexec_fn, close_fds, cwd, env,
786                universal_newlines, startupinfo, creationflags, shell,
787                p2cread, p2cwrite,
788                c2pread, c2pwrite,
789                errread, errwrite):
790            try:
791                subprocess.Popen._execute_child(
792                        self, args, executable, preexec_fn, close_fds,
793                        cwd, env, universal_newlines,
794                        startupinfo, creationflags, shell,
795                        p2cread, p2cwrite,
796                        c2pread, c2pwrite,
797                        errread, errwrite)
798            finally:
799                # Open a bunch of file descriptors and verify that
800                # none of them are the same as the ones the Popen
801                # instance is using for stdin/stdout/stderr.
802                devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
803                               for _ in range(8)]
804                try:
805                    for fd in devzero_fds:
806                        self._testcase.assertNotIn(
807                                fd, (p2cwrite, c2pread, errread))
808                finally:
809                    map(os.close, devzero_fds)
810
811    @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
812    def test_preexec_errpipe_does_not_double_close_pipes(self):
813        """Issue16140: Don't double close pipes on preexec error."""
814
815        def raise_it():
816            raise RuntimeError("force the _execute_child() errpipe_data path.")
817
818        with self.assertRaises(RuntimeError):
819            self._TestExecuteChildPopen(
820                    self, [sys.executable, "-c", "pass"],
821                    stdin=subprocess.PIPE, stdout=subprocess.PIPE,
822                    stderr=subprocess.PIPE, preexec_fn=raise_it)
823
824    def test_args_string(self):
825        # args is a string
826        f, fname = mkstemp()
827        os.write(f, "#!/bin/sh\n")
828        os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
829                    sys.executable)
830        os.close(f)
831        os.chmod(fname, 0o700)
832        p = subprocess.Popen(fname)
833        p.wait()
834        os.remove(fname)
835        self.assertEqual(p.returncode, 47)
836
837    def test_invalid_args(self):
838        # invalid arguments should raise ValueError
839        self.assertRaises(ValueError, subprocess.call,
840                          [sys.executable, "-c",
841                           "import sys; sys.exit(47)"],
842                          startupinfo=47)
843        self.assertRaises(ValueError, subprocess.call,
844                          [sys.executable, "-c",
845                           "import sys; sys.exit(47)"],
846                          creationflags=47)
847
848    def test_shell_sequence(self):
849        # Run command through the shell (sequence)
850        newenv = os.environ.copy()
851        newenv["FRUIT"] = "apple"
852        p = subprocess.Popen(["echo $FRUIT"], shell=1,
853                             stdout=subprocess.PIPE,
854                             env=newenv)
855        self.addCleanup(p.stdout.close)
856        self.assertEqual(p.stdout.read().strip(), "apple")
857
858    def test_shell_string(self):
859        # Run command through the shell (string)
860        newenv = os.environ.copy()
861        newenv["FRUIT"] = "apple"
862        p = subprocess.Popen("echo $FRUIT", shell=1,
863                             stdout=subprocess.PIPE,
864                             env=newenv)
865        self.addCleanup(p.stdout.close)
866        self.assertEqual(p.stdout.read().strip(), "apple")
867
868    def test_call_string(self):
869        # call() function with string argument on UNIX
870        f, fname = mkstemp()
871        os.write(f, "#!/bin/sh\n")
872        os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
873                    sys.executable)
874        os.close(f)
875        os.chmod(fname, 0700)
876        rc = subprocess.call(fname)
877        os.remove(fname)
878        self.assertEqual(rc, 47)
879
880    def test_specific_shell(self):
881        # Issue #9265: Incorrect name passed as arg[0].
882        shells = []
883        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
884            for name in ['bash', 'ksh']:
885                sh = os.path.join(prefix, name)
886                if os.path.isfile(sh):
887                    shells.append(sh)
888        if not shells: # Will probably work for any shell but csh.
889            self.skipTest("bash or ksh required for this test")
890        sh = '/bin/sh'
891        if os.path.isfile(sh) and not os.path.islink(sh):
892            # Test will fail if /bin/sh is a symlink to csh.
893            shells.append(sh)
894        for sh in shells:
895            p = subprocess.Popen("echo $0", executable=sh, shell=True,
896                                 stdout=subprocess.PIPE)
897            self.addCleanup(p.stdout.close)
898            self.assertEqual(p.stdout.read().strip(), sh)
899
900    def _kill_process(self, method, *args):
901        # Do not inherit file handles from the parent.
902        # It should fix failures on some platforms.
903        p = subprocess.Popen([sys.executable, "-c", """if 1:
904                             import sys, time
905                             sys.stdout.write('x\\n')
906                             sys.stdout.flush()
907                             time.sleep(30)
908                             """],
909                             close_fds=True,
910                             stdin=subprocess.PIPE,
911                             stdout=subprocess.PIPE,
912                             stderr=subprocess.PIPE)
913        # Wait for the interpreter to be completely initialized before
914        # sending any signal.
915        p.stdout.read(1)
916        getattr(p, method)(*args)
917        return p
918
919    @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
920                     "Due to known OS bug (issue #16762)")
921    def _kill_dead_process(self, method, *args):
922        # Do not inherit file handles from the parent.
923        # It should fix failures on some platforms.
924        p = subprocess.Popen([sys.executable, "-c", """if 1:
925                             import sys, time
926                             sys.stdout.write('x\\n')
927                             sys.stdout.flush()
928                             """],
929                             close_fds=True,
930                             stdin=subprocess.PIPE,
931                             stdout=subprocess.PIPE,
932                             stderr=subprocess.PIPE)
933        # Wait for the interpreter to be completely initialized before
934        # sending any signal.
935        p.stdout.read(1)
936        # The process should end after this
937        time.sleep(1)
938        # This shouldn't raise even though the child is now dead
939        getattr(p, method)(*args)
940        p.communicate()
941
942    def test_send_signal(self):
943        p = self._kill_process('send_signal', signal.SIGINT)
944        _, stderr = p.communicate()
945        self.assertIn('KeyboardInterrupt', stderr)
946        self.assertNotEqual(p.wait(), 0)
947
948    def test_kill(self):
949        p = self._kill_process('kill')
950        _, stderr = p.communicate()
951        self.assertStderrEqual(stderr, '')
952        self.assertEqual(p.wait(), -signal.SIGKILL)
953
954    def test_terminate(self):
955        p = self._kill_process('terminate')
956        _, stderr = p.communicate()
957        self.assertStderrEqual(stderr, '')
958        self.assertEqual(p.wait(), -signal.SIGTERM)
959
960    def test_send_signal_dead(self):
961        # Sending a signal to a dead process
962        self._kill_dead_process('send_signal', signal.SIGINT)
963
964    def test_kill_dead(self):
965        # Killing a dead process
966        self._kill_dead_process('kill')
967
968    def test_terminate_dead(self):
969        # Terminating a dead process
970        self._kill_dead_process('terminate')
971
972    def check_close_std_fds(self, fds):
973        # Issue #9905: test that subprocess pipes still work properly with
974        # some standard fds closed
975        stdin = 0
976        newfds = []
977        for a in fds:
978            b = os.dup(a)
979            newfds.append(b)
980            if a == 0:
981                stdin = b
982        try:
983            for fd in fds:
984                os.close(fd)
985            out, err = subprocess.Popen([sys.executable, "-c",
986                              'import sys;'
987                              'sys.stdout.write("apple");'
988                              'sys.stdout.flush();'
989                              'sys.stderr.write("orange")'],
990                       stdin=stdin,
991                       stdout=subprocess.PIPE,
992                       stderr=subprocess.PIPE).communicate()
993            err = test_support.strip_python_stderr(err)
994            self.assertEqual((out, err), (b'apple', b'orange'))
995        finally:
996            for b, a in zip(newfds, fds):
997                os.dup2(b, a)
998            for b in newfds:
999                os.close(b)
1000
1001    def test_close_fd_0(self):
1002        self.check_close_std_fds([0])
1003
1004    def test_close_fd_1(self):
1005        self.check_close_std_fds([1])
1006
1007    def test_close_fd_2(self):
1008        self.check_close_std_fds([2])
1009
1010    def test_close_fds_0_1(self):
1011        self.check_close_std_fds([0, 1])
1012
1013    def test_close_fds_0_2(self):
1014        self.check_close_std_fds([0, 2])
1015
1016    def test_close_fds_1_2(self):
1017        self.check_close_std_fds([1, 2])
1018
1019    def test_close_fds_0_1_2(self):
1020        # Issue #10806: test that subprocess pipes still work properly with
1021        # all standard fds closed.
1022        self.check_close_std_fds([0, 1, 2])
1023
1024    def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
1025        # open up some temporary files
1026        temps = [mkstemp() for i in range(3)]
1027        temp_fds = [fd for fd, fname in temps]
1028        try:
1029            # unlink the files -- we won't need to reopen them
1030            for fd, fname in temps:
1031                os.unlink(fname)
1032
1033            # save a copy of the standard file descriptors
1034            saved_fds = [os.dup(fd) for fd in range(3)]
1035            try:
1036                # duplicate the temp files over the standard fd's 0, 1, 2
1037                for fd, temp_fd in enumerate(temp_fds):
1038                    os.dup2(temp_fd, fd)
1039
1040                # write some data to what will become stdin, and rewind
1041                os.write(stdin_no, b"STDIN")
1042                os.lseek(stdin_no, 0, 0)
1043
1044                # now use those files in the given order, so that subprocess
1045                # has to rearrange them in the child
1046                p = subprocess.Popen([sys.executable, "-c",
1047                    'import sys; got = sys.stdin.read();'
1048                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
1049                    stdin=stdin_no,
1050                    stdout=stdout_no,
1051                    stderr=stderr_no)
1052                p.wait()
1053
1054                for fd in temp_fds:
1055                    os.lseek(fd, 0, 0)
1056
1057                out = os.read(stdout_no, 1024)
1058                err = test_support.strip_python_stderr(os.read(stderr_no, 1024))
1059            finally:
1060                for std, saved in enumerate(saved_fds):
1061                    os.dup2(saved, std)
1062                    os.close(saved)
1063
1064            self.assertEqual(out, b"got STDIN")
1065            self.assertEqual(err, b"err")
1066
1067        finally:
1068            for fd in temp_fds:
1069                os.close(fd)
1070
1071    # When duping fds, if there arises a situation where one of the fds is
1072    # either 0, 1 or 2, it is possible that it is overwritten (#12607).
1073    # This tests all combinations of this.
1074    def test_swap_fds(self):
1075        self.check_swap_fds(0, 1, 2)
1076        self.check_swap_fds(0, 2, 1)
1077        self.check_swap_fds(1, 0, 2)
1078        self.check_swap_fds(1, 2, 0)
1079        self.check_swap_fds(2, 0, 1)
1080        self.check_swap_fds(2, 1, 0)
1081
1082    def test_wait_when_sigchild_ignored(self):
1083        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
1084        sigchild_ignore = test_support.findfile("sigchild_ignore.py",
1085                                                subdir="subprocessdata")
1086        p = subprocess.Popen([sys.executable, sigchild_ignore],
1087                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1088        stdout, stderr = p.communicate()
1089        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
1090                         " non-zero with this error:\n%s" % stderr)
1091
1092    def test_zombie_fast_process_del(self):
1093        # Issue #12650: on Unix, if Popen.__del__() was called before the
1094        # process exited, it wouldn't be added to subprocess._active, and would
1095        # remain a zombie.
1096        # spawn a Popen, and delete its reference before it exits
1097        p = subprocess.Popen([sys.executable, "-c",
1098                              'import sys, time;'
1099                              'time.sleep(0.2)'],
1100                             stdout=subprocess.PIPE,
1101                             stderr=subprocess.PIPE)
1102        self.addCleanup(p.stdout.close)
1103        self.addCleanup(p.stderr.close)
1104        ident = id(p)
1105        pid = p.pid
1106        del p
1107        # check that p is in the active processes list
1108        self.assertIn(ident, [id(o) for o in subprocess._active])
1109
1110    def test_leak_fast_process_del_killed(self):
1111        # Issue #12650: on Unix, if Popen.__del__() was called before the
1112        # process exited, and the process got killed by a signal, it would never
1113        # be removed from subprocess._active, which triggered a FD and memory
1114        # leak.
1115        # spawn a Popen, delete its reference and kill it
1116        p = subprocess.Popen([sys.executable, "-c",
1117                              'import time;'
1118                              'time.sleep(3)'],
1119                             stdout=subprocess.PIPE,
1120                             stderr=subprocess.PIPE)
1121        self.addCleanup(p.stdout.close)
1122        self.addCleanup(p.stderr.close)
1123        ident = id(p)
1124        pid = p.pid
1125        del p
1126        os.kill(pid, signal.SIGKILL)
1127        # check that p is in the active processes list
1128        self.assertIn(ident, [id(o) for o in subprocess._active])
1129
1130        # let some time for the process to exit, and create a new Popen: this
1131        # should trigger the wait() of p
1132        time.sleep(0.2)
1133        with self.assertRaises(EnvironmentError) as c:
1134            with subprocess.Popen(['nonexisting_i_hope'],
1135                                  stdout=subprocess.PIPE,
1136                                  stderr=subprocess.PIPE) as proc:
1137                pass
1138        # p should have been wait()ed on, and removed from the _active list
1139        self.assertRaises(OSError, os.waitpid, pid, 0)
1140        self.assertNotIn(ident, [id(o) for o in subprocess._active])
1141
1142    def test_pipe_cloexec(self):
1143        # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
1144        # and are not inherited by another child process.
1145        p1 = subprocess.Popen([sys.executable, "-c",
1146                               'import os;'
1147                               'os.read(0, 1)'
1148                              ],
1149                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1150                              stderr=subprocess.PIPE)
1151
1152        p2 = subprocess.Popen([sys.executable, "-c", """if True:
1153                               import os, errno, sys
1154                               for fd in %r:
1155                                   try:
1156                                       os.close(fd)
1157                                   except OSError as e:
1158                                       if e.errno != errno.EBADF:
1159                                           raise
1160                                   else:
1161                                       sys.exit(1)
1162                               sys.exit(0)
1163                               """ % [f.fileno() for f in (p1.stdin, p1.stdout,
1164                                                           p1.stderr)]
1165                              ],
1166                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1167                              stderr=subprocess.PIPE, close_fds=False)
1168        p1.communicate('foo')
1169        _, stderr = p2.communicate()
1170
1171        self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))
1172
1173
1174@unittest.skipUnless(mswindows, "Windows specific tests")
1175class Win32ProcessTestCase(BaseTestCase):
1176
1177    def test_startupinfo(self):
1178        # startupinfo argument
1179        # We uses hardcoded constants, because we do not want to
1180        # depend on win32all.
1181        STARTF_USESHOWWINDOW = 1
1182        SW_MAXIMIZE = 3
1183        startupinfo = subprocess.STARTUPINFO()
1184        startupinfo.dwFlags = STARTF_USESHOWWINDOW
1185        startupinfo.wShowWindow = SW_MAXIMIZE
1186        # Since Python is a console process, it won't be affected
1187        # by wShowWindow, but the argument should be silently
1188        # ignored
1189        subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
1190                        startupinfo=startupinfo)
1191
1192    def test_creationflags(self):
1193        # creationflags argument
1194        CREATE_NEW_CONSOLE = 16
1195        sys.stderr.write("    a DOS box should flash briefly ...\n")
1196        subprocess.call(sys.executable +
1197                        ' -c "import time; time.sleep(0.25)"',
1198                        creationflags=CREATE_NEW_CONSOLE)
1199
1200    def test_invalid_args(self):
1201        # invalid arguments should raise ValueError
1202        self.assertRaises(ValueError, subprocess.call,
1203                          [sys.executable, "-c",
1204                           "import sys; sys.exit(47)"],
1205                          preexec_fn=lambda: 1)
1206        self.assertRaises(ValueError, subprocess.call,
1207                          [sys.executable, "-c",
1208                           "import sys; sys.exit(47)"],
1209                          stdout=subprocess.PIPE,
1210                          close_fds=True)
1211
1212    def test_close_fds(self):
1213        # close file descriptors
1214        rc = subprocess.call([sys.executable, "-c",
1215                              "import sys; sys.exit(47)"],
1216                              close_fds=True)
1217        self.assertEqual(rc, 47)
1218
1219    def test_shell_sequence(self):
1220        # Run command through the shell (sequence)
1221        newenv = os.environ.copy()
1222        newenv["FRUIT"] = "physalis"
1223        p = subprocess.Popen(["set"], shell=1,
1224                             stdout=subprocess.PIPE,
1225                             env=newenv)
1226        self.addCleanup(p.stdout.close)
1227        self.assertIn("physalis", p.stdout.read())
1228
1229    def test_shell_string(self):
1230        # Run command through the shell (string)
1231        newenv = os.environ.copy()
1232        newenv["FRUIT"] = "physalis"
1233        p = subprocess.Popen("set", shell=1,
1234                             stdout=subprocess.PIPE,
1235                             env=newenv)
1236        self.addCleanup(p.stdout.close)
1237        self.assertIn("physalis", p.stdout.read())
1238
1239    def test_call_string(self):
1240        # call() function with string argument on Windows
1241        rc = subprocess.call(sys.executable +
1242                             ' -c "import sys; sys.exit(47)"')
1243        self.assertEqual(rc, 47)
1244
1245    def _kill_process(self, method, *args):
1246        # Some win32 buildbot raises EOFError if stdin is inherited
1247        p = subprocess.Popen([sys.executable, "-c", """if 1:
1248                             import sys, time
1249                             sys.stdout.write('x\\n')
1250                             sys.stdout.flush()
1251                             time.sleep(30)
1252                             """],
1253                             stdin=subprocess.PIPE,
1254                             stdout=subprocess.PIPE,
1255                             stderr=subprocess.PIPE)
1256        self.addCleanup(p.stdout.close)
1257        self.addCleanup(p.stderr.close)
1258        self.addCleanup(p.stdin.close)
1259        # Wait for the interpreter to be completely initialized before
1260        # sending any signal.
1261        p.stdout.read(1)
1262        getattr(p, method)(*args)
1263        _, stderr = p.communicate()
1264        self.assertStderrEqual(stderr, '')
1265        returncode = p.wait()
1266        self.assertNotEqual(returncode, 0)
1267
1268    def _kill_dead_process(self, method, *args):
1269        p = subprocess.Popen([sys.executable, "-c", """if 1:
1270                             import sys, time
1271                             sys.stdout.write('x\\n')
1272                             sys.stdout.flush()
1273                             sys.exit(42)
1274                             """],
1275                             stdin=subprocess.PIPE,
1276                             stdout=subprocess.PIPE,
1277                             stderr=subprocess.PIPE)
1278        self.addCleanup(p.stdout.close)
1279        self.addCleanup(p.stderr.close)
1280        self.addCleanup(p.stdin.close)
1281        # Wait for the interpreter to be completely initialized before
1282        # sending any signal.
1283        p.stdout.read(1)
1284        # The process should end after this
1285        time.sleep(1)
1286        # This shouldn't raise even though the child is now dead
1287        getattr(p, method)(*args)
1288        _, stderr = p.communicate()
1289        self.assertStderrEqual(stderr, b'')
1290        rc = p.wait()
1291        self.assertEqual(rc, 42)
1292
1293    def test_send_signal(self):
1294        self._kill_process('send_signal', signal.SIGTERM)
1295
1296    def test_kill(self):
1297        self._kill_process('kill')
1298
1299    def test_terminate(self):
1300        self._kill_process('terminate')
1301
1302    def test_send_signal_dead(self):
1303        self._kill_dead_process('send_signal', signal.SIGTERM)
1304
1305    def test_kill_dead(self):
1306        self._kill_dead_process('kill')
1307
1308    def test_terminate_dead(self):
1309        self._kill_dead_process('terminate')
1310
1311
1312@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
1313                     "poll system call not supported")
1314class ProcessTestCaseNoPoll(ProcessTestCase):
1315    def setUp(self):
1316        subprocess._has_poll = False
1317        ProcessTestCase.setUp(self)
1318
1319    def tearDown(self):
1320        subprocess._has_poll = True
1321        ProcessTestCase.tearDown(self)
1322
1323
1324class HelperFunctionTests(unittest.TestCase):
1325    @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
1326    def test_eintr_retry_call(self):
1327        record_calls = []
1328        def fake_os_func(*args):
1329            record_calls.append(args)
1330            if len(record_calls) == 2:
1331                raise OSError(errno.EINTR, "fake interrupted system call")
1332            return tuple(reversed(args))
1333
1334        self.assertEqual((999, 256),
1335                         subprocess._eintr_retry_call(fake_os_func, 256, 999))
1336        self.assertEqual([(256, 999)], record_calls)
1337        # This time there will be an EINTR so it will loop once.
1338        self.assertEqual((666,),
1339                         subprocess._eintr_retry_call(fake_os_func, 666))
1340        self.assertEqual([(256, 999), (666,), (666,)], record_calls)
1341
1342@unittest.skipUnless(mswindows, "mswindows only")
1343class CommandsWithSpaces (BaseTestCase):
1344
1345    def setUp(self):
1346        super(CommandsWithSpaces, self).setUp()
1347        f, fname = mkstemp(".py", "te st")
1348        self.fname = fname.lower ()
1349        os.write(f, b"import sys;"
1350                    b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
1351        )
1352        os.close(f)
1353
1354    def tearDown(self):
1355        os.remove(self.fname)
1356        super(CommandsWithSpaces, self).tearDown()
1357
1358    def with_spaces(self, *args, **kwargs):
1359        kwargs['stdout'] = subprocess.PIPE
1360        p = subprocess.Popen(*args, **kwargs)
1361        self.addCleanup(p.stdout.close)
1362        self.assertEqual(
1363          p.stdout.read ().decode("mbcs"),
1364          "2 [%r, 'ab cd']" % self.fname
1365        )
1366
1367    def test_shell_string_with_spaces(self):
1368        # call() function with string argument with spaces on Windows
1369        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
1370                                             "ab cd"), shell=1)
1371
1372    def test_shell_sequence_with_spaces(self):
1373        # call() function with sequence argument with spaces on Windows
1374        self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
1375
1376    def test_noshell_string_with_spaces(self):
1377        # call() function with string argument with spaces on Windows
1378        self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
1379                             "ab cd"))
1380
1381    def test_noshell_sequence_with_spaces(self):
1382        # call() function with sequence argument with spaces on Windows
1383        self.with_spaces([sys.executable, self.fname, "ab cd"])
1384
1385def test_main():
1386    unit_tests = (ProcessTestCase,
1387                  POSIXProcessTestCase,
1388                  Win32ProcessTestCase,
1389                  ProcessTestCaseNoPoll,
1390                  HelperFunctionTests,
1391                  CommandsWithSpaces)
1392
1393    test_support.run_unittest(*unit_tests)
1394    test_support.reap_children()
1395
1396if __name__ == "__main__":
1397    test_main()
1398