1# Test case for the os.poll() function
2
3import os, select, random, unittest
4import _testcapi
5from test.test_support import TESTFN, run_unittest
6
7try:
8    select.poll
9except AttributeError:
10    raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
11
12
13def find_ready_matching(ready, flag):
14    match = []
15    for fd, mode in ready:
16        if mode & flag:
17            match.append(fd)
18    return match
19
20class PollTests(unittest.TestCase):
21
22    def test_poll1(self):
23        # Basic functional test of poll object
24        # Create a bunch of pipe and test that poll works with them.
25
26        p = select.poll()
27
28        NUM_PIPES = 12
29        MSG = " This is a test."
30        MSG_LEN = len(MSG)
31        readers = []
32        writers = []
33        r2w = {}
34        w2r = {}
35
36        for i in range(NUM_PIPES):
37            rd, wr = os.pipe()
38            p.register(rd)
39            p.modify(rd, select.POLLIN)
40            p.register(wr, select.POLLOUT)
41            readers.append(rd)
42            writers.append(wr)
43            r2w[rd] = wr
44            w2r[wr] = rd
45
46        bufs = []
47
48        while writers:
49            ready = p.poll()
50            ready_writers = find_ready_matching(ready, select.POLLOUT)
51            if not ready_writers:
52                raise RuntimeError, "no pipes ready for writing"
53            wr = random.choice(ready_writers)
54            os.write(wr, MSG)
55
56            ready = p.poll()
57            ready_readers = find_ready_matching(ready, select.POLLIN)
58            if not ready_readers:
59                raise RuntimeError, "no pipes ready for reading"
60            rd = random.choice(ready_readers)
61            buf = os.read(rd, MSG_LEN)
62            self.assertEqual(len(buf), MSG_LEN)
63            bufs.append(buf)
64            os.close(r2w[rd]) ; os.close( rd )
65            p.unregister( r2w[rd] )
66            p.unregister( rd )
67            writers.remove(r2w[rd])
68
69        self.assertEqual(bufs, [MSG] * NUM_PIPES)
70
71    def poll_unit_tests(self):
72        # returns NVAL for invalid file descriptor
73        FD = 42
74        try:
75            os.close(FD)
76        except OSError:
77            pass
78        p = select.poll()
79        p.register(FD)
80        r = p.poll()
81        self.assertEqual(r[0], (FD, select.POLLNVAL))
82
83        f = open(TESTFN, 'w')
84        fd = f.fileno()
85        p = select.poll()
86        p.register(f)
87        r = p.poll()
88        self.assertEqual(r[0][0], fd)
89        f.close()
90        r = p.poll()
91        self.assertEqual(r[0], (fd, select.POLLNVAL))
92        os.unlink(TESTFN)
93
94        # type error for invalid arguments
95        p = select.poll()
96        self.assertRaises(TypeError, p.register, p)
97        self.assertRaises(TypeError, p.unregister, p)
98
99        # can't unregister non-existent object
100        p = select.poll()
101        self.assertRaises(KeyError, p.unregister, 3)
102
103        # Test error cases
104        pollster = select.poll()
105        class Nope:
106            pass
107
108        class Almost:
109            def fileno(self):
110                return 'fileno'
111
112        self.assertRaises(TypeError, pollster.register, Nope(), 0)
113        self.assertRaises(TypeError, pollster.register, Almost(), 0)
114
115    # Another test case for poll().  This is copied from the test case for
116    # select(), modified to use poll() instead.
117
118    def test_poll2(self):
119        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
120        p = os.popen(cmd, 'r')
121        pollster = select.poll()
122        pollster.register( p, select.POLLIN )
123        for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
124            fdlist = pollster.poll(tout)
125            if (fdlist == []):
126                continue
127            fd, flags = fdlist[0]
128            if flags & select.POLLHUP:
129                line = p.readline()
130                if line != "":
131                    self.fail('error: pipe seems to be closed, but still returns data')
132                continue
133
134            elif flags & select.POLLIN:
135                line = p.readline()
136                if not line:
137                    break
138                continue
139            else:
140                self.fail('Unexpected return value from select.poll: %s' % fdlist)
141        p.close()
142
143    def test_poll3(self):
144        # test int overflow
145        pollster = select.poll()
146        pollster.register(1)
147
148        self.assertRaises(OverflowError, pollster.poll, 1L << 64)
149
150        x = 2 + 3
151        if x != 5:
152            self.fail('Overflow must have occurred')
153
154        pollster = select.poll()
155        # Issue 15989
156        self.assertRaises(OverflowError, pollster.register, 0,
157                          _testcapi.SHRT_MAX + 1)
158        self.assertRaises(OverflowError, pollster.register, 0,
159                          _testcapi.USHRT_MAX + 1)
160        self.assertRaises(OverflowError, pollster.poll, _testcapi.INT_MAX + 1)
161        self.assertRaises(OverflowError, pollster.poll, _testcapi.UINT_MAX + 1)
162
163def test_main():
164    run_unittest(PollTests)
165
166if __name__ == '__main__':
167    test_main()
168