test_poll.py revision 03241e801780edd967923d1cce00c2d07b208e58
1# Test case for the os.poll() function 2 3import os 4import subprocess 5import random 6import select 7from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX 8try: 9 import threading 10except ImportError: 11 threading = None 12import time 13import unittest 14from test.support import TESTFN, run_unittest, reap_threads 15 16try: 17 select.poll 18except AttributeError: 19 raise unittest.SkipTest("select.poll not defined") 20 21 22def find_ready_matching(ready, flag): 23 match = [] 24 for fd, mode in ready: 25 if mode & flag: 26 match.append(fd) 27 return match 28 29class PollTests(unittest.TestCase): 30 31 def test_poll1(self): 32 # Basic functional test of poll object 33 # Create a bunch of pipe and test that poll works with them. 34 35 p = select.poll() 36 37 NUM_PIPES = 12 38 MSG = b" This is a test." 39 MSG_LEN = len(MSG) 40 readers = [] 41 writers = [] 42 r2w = {} 43 w2r = {} 44 45 for i in range(NUM_PIPES): 46 rd, wr = os.pipe() 47 p.register(rd) 48 p.modify(rd, select.POLLIN) 49 p.register(wr, select.POLLOUT) 50 readers.append(rd) 51 writers.append(wr) 52 r2w[rd] = wr 53 w2r[wr] = rd 54 55 bufs = [] 56 57 while writers: 58 ready = p.poll() 59 ready_writers = find_ready_matching(ready, select.POLLOUT) 60 if not ready_writers: 61 raise RuntimeError("no pipes ready for writing") 62 wr = random.choice(ready_writers) 63 os.write(wr, MSG) 64 65 ready = p.poll() 66 ready_readers = find_ready_matching(ready, select.POLLIN) 67 if not ready_readers: 68 raise RuntimeError("no pipes ready for reading") 69 rd = random.choice(ready_readers) 70 buf = os.read(rd, MSG_LEN) 71 self.assertEqual(len(buf), MSG_LEN) 72 bufs.append(buf) 73 os.close(r2w[rd]) ; os.close( rd ) 74 p.unregister( r2w[rd] ) 75 p.unregister( rd ) 76 writers.remove(r2w[rd]) 77 78 self.assertEqual(bufs, [MSG] * NUM_PIPES) 79 80 def test_poll_unit_tests(self): 81 # returns NVAL for invalid file descriptor 82 FD, w = os.pipe() 83 os.close(FD) 84 os.close(w) 85 p = select.poll() 86 p.register(FD) 87 r = p.poll() 88 self.assertEqual(r[0], (FD, select.POLLNVAL)) 89 90 f = open(TESTFN, 'w') 91 fd = f.fileno() 92 p = select.poll() 93 p.register(f) 94 r = p.poll() 95 self.assertEqual(r[0][0], fd) 96 f.close() 97 r = p.poll() 98 self.assertEqual(r[0], (fd, select.POLLNVAL)) 99 os.unlink(TESTFN) 100 101 # type error for invalid arguments 102 p = select.poll() 103 self.assertRaises(TypeError, p.register, p) 104 self.assertRaises(TypeError, p.unregister, p) 105 106 # can't unregister non-existent object 107 p = select.poll() 108 self.assertRaises(KeyError, p.unregister, 3) 109 110 # Test error cases 111 pollster = select.poll() 112 class Nope: 113 pass 114 115 class Almost: 116 def fileno(self): 117 return 'fileno' 118 119 self.assertRaises(TypeError, pollster.register, Nope(), 0) 120 self.assertRaises(TypeError, pollster.register, Almost(), 0) 121 122 # Another test case for poll(). This is copied from the test case for 123 # select(), modified to use poll() instead. 124 125 def test_poll2(self): 126 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' 127 proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, 128 bufsize=0) 129 p = proc.stdout 130 pollster = select.poll() 131 pollster.register( p, select.POLLIN ) 132 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: 133 fdlist = pollster.poll(tout) 134 if (fdlist == []): 135 continue 136 fd, flags = fdlist[0] 137 if flags & select.POLLHUP: 138 line = p.readline() 139 if line != b"": 140 self.fail('error: pipe seems to be closed, but still returns data') 141 continue 142 143 elif flags & select.POLLIN: 144 line = p.readline() 145 if not line: 146 break 147 self.assertEqual(line, b'testing...\n') 148 continue 149 else: 150 self.fail('Unexpected return value from select.poll: %s' % fdlist) 151 p.close() 152 153 def test_poll3(self): 154 # test int overflow 155 pollster = select.poll() 156 pollster.register(1) 157 158 self.assertRaises(OverflowError, pollster.poll, 1 << 64) 159 160 x = 2 + 3 161 if x != 5: 162 self.fail('Overflow must have occurred') 163 164 # Issues #15989, #17919 165 self.assertRaises(OverflowError, pollster.register, 0, -1) 166 self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1) 167 self.assertRaises(OverflowError, pollster.modify, 1, -1) 168 self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1) 169 self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1) 170 self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1) 171 172 @unittest.skipUnless(threading, 'Threading required for this test.') 173 @reap_threads 174 def test_threaded_poll(self): 175 r, w = os.pipe() 176 self.addCleanup(os.close, r) 177 self.addCleanup(os.close, w) 178 rfds = [] 179 for i in range(10): 180 fd = os.dup(r) 181 self.addCleanup(os.close, fd) 182 rfds.append(fd) 183 pollster = select.poll() 184 for fd in rfds: 185 pollster.register(fd, select.POLLIN) 186 187 t = threading.Thread(target=pollster.poll) 188 t.start() 189 try: 190 time.sleep(0.5) 191 # trigger ufds array reallocation 192 for fd in rfds: 193 pollster.unregister(fd) 194 pollster.register(w, select.POLLOUT) 195 self.assertRaises(RuntimeError, pollster.poll) 196 finally: 197 # and make the call to poll() from the thread return 198 os.write(w, b'spam') 199 t.join() 200 201 202def test_main(): 203 run_unittest(PollTests) 204 205if __name__ == '__main__': 206 test_main() 207