1# Test case for the os.poll() function
2
3import os
4import subprocess
5import random
6import select
7try:
8    import threading
9except ImportError:
10    threading = None
11import time
12import unittest
13from test.support import TESTFN, run_unittest, reap_threads, cpython_only
14
15try:
16    select.poll
17except AttributeError:
18    raise unittest.SkipTest("select.poll not defined")
19
20
21def find_ready_matching(ready, flag):
22    match = []
23    for fd, mode in ready:
24        if mode & flag:
25            match.append(fd)
26    return match
27
28class PollTests(unittest.TestCase):
29
30    def test_poll1(self):
31        # Basic functional test of poll object
32        # Create a bunch of pipe and test that poll works with them.
33
34        p = select.poll()
35
36        NUM_PIPES = 12
37        MSG = b" This is a test."
38        MSG_LEN = len(MSG)
39        readers = []
40        writers = []
41        r2w = {}
42        w2r = {}
43
44        for i in range(NUM_PIPES):
45            rd, wr = os.pipe()
46            p.register(rd)
47            p.modify(rd, select.POLLIN)
48            p.register(wr, select.POLLOUT)
49            readers.append(rd)
50            writers.append(wr)
51            r2w[rd] = wr
52            w2r[wr] = rd
53
54        bufs = []
55
56        while writers:
57            ready = p.poll()
58            ready_writers = find_ready_matching(ready, select.POLLOUT)
59            if not ready_writers:
60                raise RuntimeError("no pipes ready for writing")
61            wr = random.choice(ready_writers)
62            os.write(wr, MSG)
63
64            ready = p.poll()
65            ready_readers = find_ready_matching(ready, select.POLLIN)
66            if not ready_readers:
67                raise RuntimeError("no pipes ready for reading")
68            rd = random.choice(ready_readers)
69            buf = os.read(rd, MSG_LEN)
70            self.assertEqual(len(buf), MSG_LEN)
71            bufs.append(buf)
72            os.close(r2w[rd]) ; os.close( rd )
73            p.unregister( r2w[rd] )
74            p.unregister( rd )
75            writers.remove(r2w[rd])
76
77        self.assertEqual(bufs, [MSG] * NUM_PIPES)
78
79    def test_poll_unit_tests(self):
80        # returns NVAL for invalid file descriptor
81        FD, w = os.pipe()
82        os.close(FD)
83        os.close(w)
84        p = select.poll()
85        p.register(FD)
86        r = p.poll()
87        self.assertEqual(r[0], (FD, select.POLLNVAL))
88
89        f = open(TESTFN, 'w')
90        fd = f.fileno()
91        p = select.poll()
92        p.register(f)
93        r = p.poll()
94        self.assertEqual(r[0][0], fd)
95        f.close()
96        r = p.poll()
97        self.assertEqual(r[0], (fd, select.POLLNVAL))
98        os.unlink(TESTFN)
99
100        # type error for invalid arguments
101        p = select.poll()
102        self.assertRaises(TypeError, p.register, p)
103        self.assertRaises(TypeError, p.unregister, p)
104
105        # can't unregister non-existent object
106        p = select.poll()
107        self.assertRaises(KeyError, p.unregister, 3)
108
109        # Test error cases
110        pollster = select.poll()
111        class Nope:
112            pass
113
114        class Almost:
115            def fileno(self):
116                return 'fileno'
117
118        self.assertRaises(TypeError, pollster.register, Nope(), 0)
119        self.assertRaises(TypeError, pollster.register, Almost(), 0)
120
121    # Another test case for poll().  This is copied from the test case for
122    # select(), modified to use poll() instead.
123
124    def test_poll2(self):
125        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
126        proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
127                                bufsize=0)
128        proc.__enter__()
129        self.addCleanup(proc.__exit__, None, None, None)
130        p = proc.stdout
131        pollster = select.poll()
132        pollster.register( p, select.POLLIN )
133        for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
134            fdlist = pollster.poll(tout)
135            if (fdlist == []):
136                continue
137            fd, flags = fdlist[0]
138            if flags & select.POLLHUP:
139                line = p.readline()
140                if line != b"":
141                    self.fail('error: pipe seems to be closed, but still returns data')
142                continue
143
144            elif flags & select.POLLIN:
145                line = p.readline()
146                if not line:
147                    break
148                self.assertEqual(line, b'testing...\n')
149                continue
150            else:
151                self.fail('Unexpected return value from select.poll: %s' % fdlist)
152
153    def test_poll3(self):
154        # test int overflow
155        pollster = select.poll()
156        pollster.register(1)
157
158        self.assertRaises(OverflowError, pollster.poll, 1 << 64)
159
160        x = 2 + 3
161        if x != 5:
162            self.fail('Overflow must have occurred')
163
164        # Issues #15989, #17919
165        self.assertRaises(OverflowError, pollster.register, 0, -1)
166        self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
167        self.assertRaises(OverflowError, pollster.modify, 1, -1)
168        self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
169
170    @cpython_only
171    def test_poll_c_limits(self):
172        from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
173        pollster = select.poll()
174        pollster.register(1)
175
176        # Issues #15989, #17919
177        self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
178        self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
179        self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
180        self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
181
182    @unittest.skipUnless(threading, 'Threading required for this test.')
183    @reap_threads
184    def test_threaded_poll(self):
185        r, w = os.pipe()
186        self.addCleanup(os.close, r)
187        self.addCleanup(os.close, w)
188        rfds = []
189        for i in range(10):
190            fd = os.dup(r)
191            self.addCleanup(os.close, fd)
192            rfds.append(fd)
193        pollster = select.poll()
194        for fd in rfds:
195            pollster.register(fd, select.POLLIN)
196
197        t = threading.Thread(target=pollster.poll)
198        t.start()
199        try:
200            time.sleep(0.5)
201            # trigger ufds array reallocation
202            for fd in rfds:
203                pollster.unregister(fd)
204            pollster.register(w, select.POLLOUT)
205            self.assertRaises(RuntimeError, pollster.poll)
206        finally:
207            # and make the call to poll() from the thread return
208            os.write(w, b'spam')
209            t.join()
210
211
212def test_main():
213    run_unittest(PollTests)
214
215if __name__ == '__main__':
216    test_main()
217