test_poll.py revision 03241e801780edd967923d1cce00c2d07b208e58
1# Test case for the os.poll() function
2
3import os
4import subprocess
5import random
6import select
7from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
8try:
9    import threading
10except ImportError:
11    threading = None
12import time
13import unittest
14from test.support import TESTFN, run_unittest, reap_threads
15
16try:
17    select.poll
18except AttributeError:
19    raise unittest.SkipTest("select.poll not defined")
20
21
22def find_ready_matching(ready, flag):
23    match = []
24    for fd, mode in ready:
25        if mode & flag:
26            match.append(fd)
27    return match
28
29class PollTests(unittest.TestCase):
30
31    def test_poll1(self):
32        # Basic functional test of poll object
33        # Create a bunch of pipe and test that poll works with them.
34
35        p = select.poll()
36
37        NUM_PIPES = 12
38        MSG = b" This is a test."
39        MSG_LEN = len(MSG)
40        readers = []
41        writers = []
42        r2w = {}
43        w2r = {}
44
45        for i in range(NUM_PIPES):
46            rd, wr = os.pipe()
47            p.register(rd)
48            p.modify(rd, select.POLLIN)
49            p.register(wr, select.POLLOUT)
50            readers.append(rd)
51            writers.append(wr)
52            r2w[rd] = wr
53            w2r[wr] = rd
54
55        bufs = []
56
57        while writers:
58            ready = p.poll()
59            ready_writers = find_ready_matching(ready, select.POLLOUT)
60            if not ready_writers:
61                raise RuntimeError("no pipes ready for writing")
62            wr = random.choice(ready_writers)
63            os.write(wr, MSG)
64
65            ready = p.poll()
66            ready_readers = find_ready_matching(ready, select.POLLIN)
67            if not ready_readers:
68                raise RuntimeError("no pipes ready for reading")
69            rd = random.choice(ready_readers)
70            buf = os.read(rd, MSG_LEN)
71            self.assertEqual(len(buf), MSG_LEN)
72            bufs.append(buf)
73            os.close(r2w[rd]) ; os.close( rd )
74            p.unregister( r2w[rd] )
75            p.unregister( rd )
76            writers.remove(r2w[rd])
77
78        self.assertEqual(bufs, [MSG] * NUM_PIPES)
79
80    def test_poll_unit_tests(self):
81        # returns NVAL for invalid file descriptor
82        FD, w = os.pipe()
83        os.close(FD)
84        os.close(w)
85        p = select.poll()
86        p.register(FD)
87        r = p.poll()
88        self.assertEqual(r[0], (FD, select.POLLNVAL))
89
90        f = open(TESTFN, 'w')
91        fd = f.fileno()
92        p = select.poll()
93        p.register(f)
94        r = p.poll()
95        self.assertEqual(r[0][0], fd)
96        f.close()
97        r = p.poll()
98        self.assertEqual(r[0], (fd, select.POLLNVAL))
99        os.unlink(TESTFN)
100
101        # type error for invalid arguments
102        p = select.poll()
103        self.assertRaises(TypeError, p.register, p)
104        self.assertRaises(TypeError, p.unregister, p)
105
106        # can't unregister non-existent object
107        p = select.poll()
108        self.assertRaises(KeyError, p.unregister, 3)
109
110        # Test error cases
111        pollster = select.poll()
112        class Nope:
113            pass
114
115        class Almost:
116            def fileno(self):
117                return 'fileno'
118
119        self.assertRaises(TypeError, pollster.register, Nope(), 0)
120        self.assertRaises(TypeError, pollster.register, Almost(), 0)
121
122    # Another test case for poll().  This is copied from the test case for
123    # select(), modified to use poll() instead.
124
125    def test_poll2(self):
126        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
127        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
128                                bufsize=0)
129        p = proc.stdout
130        pollster = select.poll()
131        pollster.register( p, select.POLLIN )
132        for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
133            fdlist = pollster.poll(tout)
134            if (fdlist == []):
135                continue
136            fd, flags = fdlist[0]
137            if flags & select.POLLHUP:
138                line = p.readline()
139                if line != b"":
140                    self.fail('error: pipe seems to be closed, but still returns data')
141                continue
142
143            elif flags & select.POLLIN:
144                line = p.readline()
145                if not line:
146                    break
147                self.assertEqual(line, b'testing...\n')
148                continue
149            else:
150                self.fail('Unexpected return value from select.poll: %s' % fdlist)
151        p.close()
152
153    def test_poll3(self):
154        # test int overflow
155        pollster = select.poll()
156        pollster.register(1)
157
158        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
159
160        x = 2 + 3
161        if x != 5:
162            self.fail('Overflow must have occurred')
163
164        # Issues #15989, #17919
165        self.assertRaises(OverflowError, pollster.register, 0, -1)
166        self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
167        self.assertRaises(OverflowError, pollster.modify, 1, -1)
168        self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
169        self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
170        self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
171
172    @unittest.skipUnless(threading, 'Threading required for this test.')
173    @reap_threads
174    def test_threaded_poll(self):
175        r, w = os.pipe()
176        self.addCleanup(os.close, r)
177        self.addCleanup(os.close, w)
178        rfds = []
179        for i in range(10):
180            fd = os.dup(r)
181            self.addCleanup(os.close, fd)
182            rfds.append(fd)
183        pollster = select.poll()
184        for fd in rfds:
185            pollster.register(fd, select.POLLIN)
186
187        t = threading.Thread(target=pollster.poll)
188        t.start()
189        try:
190            time.sleep(0.5)
191            # trigger ufds array reallocation
192            for fd in rfds:
193                pollster.unregister(fd)
194            pollster.register(w, select.POLLOUT)
195            self.assertRaises(RuntimeError, pollster.poll)
196        finally:
197            # and make the call to poll() from the thread return
198            os.write(w, b'spam')
199            t.join()
200
201
202def test_main():
203    run_unittest(PollTests)
204
205if __name__ == '__main__':
206    test_main()
207