1#! /usr/bin/env python
2"""Test script for popen2.py"""
3
4import warnings
5warnings.filterwarnings("ignore", ".*popen2 module is deprecated.*",
6                        DeprecationWarning)
7warnings.filterwarnings("ignore", "os\.popen. is deprecated.*",
8                        DeprecationWarning)
9
10import os
11import sys
12import unittest
13import popen2
14
15from test.test_support import run_unittest, reap_children
16
17if sys.platform[:4] == 'beos' or sys.platform[:6] == 'atheos':
18    #  Locks get messed up or something.  Generally we're supposed
19    #  to avoid mixing "posix" fork & exec with native threads, and
20    #  they may be right about that after all.
21    raise unittest.SkipTest("popen2() doesn't work on " + sys.platform)
22
23# if we don't have os.popen, check that
24# we have os.fork.  if not, skip the test
25# (by raising an ImportError)
26try:
27    from os import popen
28    del popen
29except ImportError:
30    from os import fork
31    del fork
32
33class Popen2Test(unittest.TestCase):
34    cmd = "cat"
35    if os.name == "nt":
36        cmd = "more"
37    teststr = "ab cd\n"
38    # "more" doesn't act the same way across Windows flavors,
39    # sometimes adding an extra newline at the start or the
40    # end.  So we strip whitespace off both ends for comparison.
41    expected = teststr.strip()
42
43    def setUp(self):
44        popen2._cleanup()
45        # When the test runs, there shouldn't be any open pipes
46        self.assertFalse(popen2._active, "Active pipes when test starts" +
47            repr([c.cmd for c in popen2._active]))
48
49    def tearDown(self):
50        for inst in popen2._active:
51            inst.wait()
52        popen2._cleanup()
53        self.assertFalse(popen2._active, "popen2._active not empty")
54        # The os.popen*() API delegates to the subprocess module (on Unix)
55        import subprocess
56        for inst in subprocess._active:
57            inst.wait()
58        subprocess._cleanup()
59        self.assertFalse(subprocess._active, "subprocess._active not empty")
60        reap_children()
61
62    def validate_output(self, teststr, expected_out, r, w, e=None):
63        w.write(teststr)
64        w.close()
65        got = r.read()
66        self.assertEqual(expected_out, got.strip(), "wrote %r read %r" %
67                         (teststr, got))
68
69        if e is not None:
70            got = e.read()
71            self.assertFalse(got, "unexpected %r on stderr" % got)
72
73    def test_popen2(self):
74        r, w = popen2.popen2(self.cmd)
75        self.validate_output(self.teststr, self.expected, r, w)
76
77    def test_popen3(self):
78        if os.name == 'posix':
79            r, w, e = popen2.popen3([self.cmd])
80            self.validate_output(self.teststr, self.expected, r, w, e)
81
82        r, w, e = popen2.popen3(self.cmd)
83        self.validate_output(self.teststr, self.expected, r, w, e)
84
85    def test_os_popen2(self):
86        # same test as test_popen2(), but using the os.popen*() API
87        if os.name == 'posix':
88            w, r = os.popen2([self.cmd])
89            self.validate_output(self.teststr, self.expected, r, w)
90
91            w, r = os.popen2(["echo", self.teststr])
92            got = r.read()
93            self.assertEqual(got, self.teststr + "\n")
94
95        w, r = os.popen2(self.cmd)
96        self.validate_output(self.teststr, self.expected, r, w)
97
98    def test_os_popen3(self):
99        # same test as test_popen3(), but using the os.popen*() API
100        if os.name == 'posix':
101            w, r, e = os.popen3([self.cmd])
102            self.validate_output(self.teststr, self.expected, r, w, e)
103
104            w, r, e = os.popen3(["echo", self.teststr])
105            got = r.read()
106            self.assertEqual(got, self.teststr + "\n")
107            got = e.read()
108            self.assertFalse(got, "unexpected %r on stderr" % got)
109
110        w, r, e = os.popen3(self.cmd)
111        self.validate_output(self.teststr, self.expected, r, w, e)
112
113    def test_os_popen4(self):
114        if os.name == 'posix':
115            w, r = os.popen4([self.cmd])
116            self.validate_output(self.teststr, self.expected, r, w)
117
118            w, r = os.popen4(["echo", self.teststr])
119            got = r.read()
120            self.assertEqual(got, self.teststr + "\n")
121
122        w, r = os.popen4(self.cmd)
123        self.validate_output(self.teststr, self.expected, r, w)
124
125
126def test_main():
127    run_unittest(Popen2Test)
128
129if __name__ == "__main__":
130    test_main()
131