1from test.support import verbose, import_module, reap_children
2
3# Skip these tests if termios is not available
4import_module('termios')
5
6import errno
7import pty
8import os
9import sys
10import select
11import signal
12import socket
13import unittest
14
15TEST_STRING_1 = b"I wish to buy a fish license.\n"
16TEST_STRING_2 = b"For my pet fish, Eric.\n"
17
18if verbose:
19    def debug(msg):
20        print(msg)
21else:
22    def debug(msg):
23        pass
24
25
26def normalize_output(data):
27    # Some operating systems do conversions on newline.  We could possibly
28    # fix that by doing the appropriate termios.tcsetattr()s.  I couldn't
29    # figure out the right combo on Tru64 and I don't have an IRIX box.
30    # So just normalize the output and doc the problem O/Ses by allowing
31    # certain combinations for some platforms, but avoid allowing other
32    # differences (like extra whitespace, trailing garbage, etc.)
33
34    # This is about the best we can do without getting some feedback
35    # from someone more knowledgable.
36
37    # OSF/1 (Tru64) apparently turns \n into \r\r\n.
38    if data.endswith(b'\r\r\n'):
39        return data.replace(b'\r\r\n', b'\n')
40
41    # IRIX apparently turns \n into \r\n.
42    if data.endswith(b'\r\n'):
43        return data.replace(b'\r\n', b'\n')
44
45    return data
46
47
48# Marginal testing of pty suite. Cannot do extensive 'do or fail' testing
49# because pty code is not too portable.
50# XXX(nnorwitz):  these tests leak fds when there is an error.
51class PtyTest(unittest.TestCase):
52    def setUp(self):
53        # isatty() and close() can hang on some platforms.  Set an alarm
54        # before running the test to make sure we don't hang forever.
55        self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
56        signal.alarm(10)
57
58    def tearDown(self):
59        # remove alarm, restore old alarm handler
60        signal.alarm(0)
61        signal.signal(signal.SIGALRM, self.old_alarm)
62
63    def handle_sig(self, sig, frame):
64        self.fail("isatty hung")
65
66    def test_basic(self):
67        try:
68            debug("Calling master_open()")
69            master_fd, slave_name = pty.master_open()
70            debug("Got master_fd '%d', slave_name '%s'" %
71                  (master_fd, slave_name))
72            debug("Calling slave_open(%r)" % (slave_name,))
73            slave_fd = pty.slave_open(slave_name)
74            debug("Got slave_fd '%d'" % slave_fd)
75        except OSError:
76            # " An optional feature could not be imported " ... ?
77            raise unittest.SkipTest("Pseudo-terminals (seemingly) not functional.")
78
79        self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
80
81        # Solaris requires reading the fd before anything is returned.
82        # My guess is that since we open and close the slave fd
83        # in master_open(), we need to read the EOF.
84
85        # Ensure the fd is non-blocking in case there's nothing to read.
86        blocking = os.get_blocking(master_fd)
87        try:
88            os.set_blocking(master_fd, False)
89            try:
90                s1 = os.read(master_fd, 1024)
91                self.assertEqual(b'', s1)
92            except OSError as e:
93                if e.errno != errno.EAGAIN:
94                    raise
95        finally:
96            # Restore the original flags.
97            os.set_blocking(master_fd, blocking)
98
99        debug("Writing to slave_fd")
100        os.write(slave_fd, TEST_STRING_1)
101        s1 = os.read(master_fd, 1024)
102        self.assertEqual(b'I wish to buy a fish license.\n',
103                         normalize_output(s1))
104
105        debug("Writing chunked output")
106        os.write(slave_fd, TEST_STRING_2[:5])
107        os.write(slave_fd, TEST_STRING_2[5:])
108        s2 = os.read(master_fd, 1024)
109        self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2))
110
111        os.close(slave_fd)
112        os.close(master_fd)
113
114
115    def test_fork(self):
116        debug("calling pty.fork()")
117        pid, master_fd = pty.fork()
118        if pid == pty.CHILD:
119            # stdout should be connected to a tty.
120            if not os.isatty(1):
121                debug("Child's fd 1 is not a tty?!")
122                os._exit(3)
123
124            # After pty.fork(), the child should already be a session leader.
125            # (on those systems that have that concept.)
126            debug("In child, calling os.setsid()")
127            try:
128                os.setsid()
129            except OSError:
130                # Good, we already were session leader
131                debug("Good: OSError was raised.")
132                pass
133            except AttributeError:
134                # Have pty, but not setsid()?
135                debug("No setsid() available?")
136                pass
137            except:
138                # We don't want this error to propagate, escaping the call to
139                # os._exit() and causing very peculiar behavior in the calling
140                # regrtest.py !
141                # Note: could add traceback printing here.
142                debug("An unexpected error was raised.")
143                os._exit(1)
144            else:
145                debug("os.setsid() succeeded! (bad!)")
146                os._exit(2)
147            os._exit(4)
148        else:
149            debug("Waiting for child (%d) to finish." % pid)
150            # In verbose mode, we have to consume the debug output from the
151            # child or the child will block, causing this test to hang in the
152            # parent's waitpid() call.  The child blocks after a
153            # platform-dependent amount of data is written to its fd.  On
154            # Linux 2.6, it's 4000 bytes and the child won't block, but on OS
155            # X even the small writes in the child above will block it.  Also
156            # on Linux, the read() will raise an OSError (input/output error)
157            # when it tries to read past the end of the buffer but the child's
158            # already exited, so catch and discard those exceptions.  It's not
159            # worth checking for EIO.
160            while True:
161                try:
162                    data = os.read(master_fd, 80)
163                except OSError:
164                    break
165                if not data:
166                    break
167                sys.stdout.write(str(data.replace(b'\r\n', b'\n'),
168                                     encoding='ascii'))
169
170            ##line = os.read(master_fd, 80)
171            ##lines = line.replace('\r\n', '\n').split('\n')
172            ##if False and lines != ['In child, calling os.setsid()',
173            ##             'Good: OSError was raised.', '']:
174            ##    raise TestFailed("Unexpected output from child: %r" % line)
175
176            (pid, status) = os.waitpid(pid, 0)
177            res = status >> 8
178            debug("Child (%d) exited with status %d (%d)." % (pid, res, status))
179            if res == 1:
180                self.fail("Child raised an unexpected exception in os.setsid()")
181            elif res == 2:
182                self.fail("pty.fork() failed to make child a session leader.")
183            elif res == 3:
184                self.fail("Child spawned by pty.fork() did not have a tty as stdout")
185            elif res != 4:
186                self.fail("pty.fork() failed for unknown reasons.")
187
188            ##debug("Reading from master_fd now that the child has exited")
189            ##try:
190            ##    s1 = os.read(master_fd, 1024)
191            ##except OSError:
192            ##    pass
193            ##else:
194            ##    raise TestFailed("Read from master_fd did not raise exception")
195
196        os.close(master_fd)
197
198        # pty.fork() passed.
199
200
201class SmallPtyTests(unittest.TestCase):
202    """These tests don't spawn children or hang."""
203
204    def setUp(self):
205        self.orig_stdin_fileno = pty.STDIN_FILENO
206        self.orig_stdout_fileno = pty.STDOUT_FILENO
207        self.orig_pty_select = pty.select
208        self.fds = []  # A list of file descriptors to close.
209        self.files = []
210        self.select_rfds_lengths = []
211        self.select_rfds_results = []
212
213    def tearDown(self):
214        pty.STDIN_FILENO = self.orig_stdin_fileno
215        pty.STDOUT_FILENO = self.orig_stdout_fileno
216        pty.select = self.orig_pty_select
217        for file in self.files:
218            try:
219                file.close()
220            except OSError:
221                pass
222        for fd in self.fds:
223            try:
224                os.close(fd)
225            except OSError:
226                pass
227
228    def _pipe(self):
229        pipe_fds = os.pipe()
230        self.fds.extend(pipe_fds)
231        return pipe_fds
232
233    def _socketpair(self):
234        socketpair = socket.socketpair()
235        self.files.extend(socketpair)
236        return socketpair
237
238    def _mock_select(self, rfds, wfds, xfds):
239        # This will raise IndexError when no more expected calls exist.
240        self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds))
241        return self.select_rfds_results.pop(0), [], []
242
243    def test__copy_to_each(self):
244        """Test the normal data case on both master_fd and stdin."""
245        read_from_stdout_fd, mock_stdout_fd = self._pipe()
246        pty.STDOUT_FILENO = mock_stdout_fd
247        mock_stdin_fd, write_to_stdin_fd = self._pipe()
248        pty.STDIN_FILENO = mock_stdin_fd
249        socketpair = self._socketpair()
250        masters = [s.fileno() for s in socketpair]
251
252        # Feed data.  Smaller than PIPEBUF.  These writes will not block.
253        os.write(masters[1], b'from master')
254        os.write(write_to_stdin_fd, b'from stdin')
255
256        # Expect two select calls, the last one will cause IndexError
257        pty.select = self._mock_select
258        self.select_rfds_lengths.append(2)
259        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
260        self.select_rfds_lengths.append(2)
261
262        with self.assertRaises(IndexError):
263            pty._copy(masters[0])
264
265        # Test that the right data went to the right places.
266        rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
267        self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
268        self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
269        self.assertEqual(os.read(masters[1], 20), b'from stdin')
270
271    def test__copy_eof_on_all(self):
272        """Test the empty read EOF case on both master_fd and stdin."""
273        read_from_stdout_fd, mock_stdout_fd = self._pipe()
274        pty.STDOUT_FILENO = mock_stdout_fd
275        mock_stdin_fd, write_to_stdin_fd = self._pipe()
276        pty.STDIN_FILENO = mock_stdin_fd
277        socketpair = self._socketpair()
278        masters = [s.fileno() for s in socketpair]
279
280        socketpair[1].close()
281        os.close(write_to_stdin_fd)
282
283        # Expect two select calls, the last one will cause IndexError
284        pty.select = self._mock_select
285        self.select_rfds_lengths.append(2)
286        self.select_rfds_results.append([mock_stdin_fd, masters[0]])
287        # We expect that both fds were removed from the fds list as they
288        # both encountered an EOF before the second select call.
289        self.select_rfds_lengths.append(0)
290
291        with self.assertRaises(IndexError):
292            pty._copy(masters[0])
293
294
295def tearDownModule():
296    reap_children()
297
298if __name__ == "__main__":
299    unittest.main()
300