test_poll.py revision 07c34bf19f0dad2d9780e213f462882932b79429
1# Test case for the os.poll() function 2 3import os, select, random, unittest, subprocess 4from test.support import TESTFN, run_unittest 5 6try: 7 select.poll 8except AttributeError: 9 raise unittest.SkipTest("select.poll not defined") 10 11 12def find_ready_matching(ready, flag): 13 match = [] 14 for fd, mode in ready: 15 if mode & flag: 16 match.append(fd) 17 return match 18 19class PollTests(unittest.TestCase): 20 21 def test_poll1(self): 22 # Basic functional test of poll object 23 # Create a bunch of pipe and test that poll works with them. 24 25 p = select.poll() 26 27 NUM_PIPES = 12 28 MSG = b" This is a test." 29 MSG_LEN = len(MSG) 30 readers = [] 31 writers = [] 32 r2w = {} 33 w2r = {} 34 35 for i in range(NUM_PIPES): 36 rd, wr = os.pipe() 37 p.register(rd) 38 p.modify(rd, select.POLLIN) 39 p.register(wr, select.POLLOUT) 40 readers.append(rd) 41 writers.append(wr) 42 r2w[rd] = wr 43 w2r[wr] = rd 44 45 bufs = [] 46 47 while writers: 48 ready = p.poll() 49 ready_writers = find_ready_matching(ready, select.POLLOUT) 50 if not ready_writers: 51 raise RuntimeError("no pipes ready for writing") 52 wr = random.choice(ready_writers) 53 os.write(wr, MSG) 54 55 ready = p.poll() 56 ready_readers = find_ready_matching(ready, select.POLLIN) 57 if not ready_readers: 58 raise RuntimeError("no pipes ready for reading") 59 rd = random.choice(ready_readers) 60 buf = os.read(rd, MSG_LEN) 61 self.assertEqual(len(buf), MSG_LEN) 62 bufs.append(buf) 63 os.close(r2w[rd]) ; os.close( rd ) 64 p.unregister( r2w[rd] ) 65 p.unregister( rd ) 66 writers.remove(r2w[rd]) 67 68 self.assertEqual(bufs, [MSG] * NUM_PIPES) 69 70 def test_poll_unit_tests(self): 71 # returns NVAL for invalid file descriptor 72 FD, w = os.pipe() 73 os.close(FD) 74 os.close(w) 75 p = select.poll() 76 p.register(FD) 77 r = p.poll() 78 self.assertEqual(r[0], (FD, select.POLLNVAL)) 79 80 f = open(TESTFN, 'w') 81 fd = f.fileno() 82 p = select.poll() 83 p.register(f) 84 r = p.poll() 85 self.assertEqual(r[0][0], fd) 86 f.close() 87 r = p.poll() 88 self.assertEqual(r[0], (fd, select.POLLNVAL)) 89 os.unlink(TESTFN) 90 91 # type error for invalid arguments 92 p = select.poll() 93 self.assertRaises(TypeError, p.register, p) 94 self.assertRaises(TypeError, p.unregister, p) 95 96 # can't unregister non-existent object 97 p = select.poll() 98 self.assertRaises(KeyError, p.unregister, 3) 99 100 # Test error cases 101 pollster = select.poll() 102 class Nope: 103 pass 104 105 class Almost: 106 def fileno(self): 107 return 'fileno' 108 109 self.assertRaises(TypeError, pollster.register, Nope(), 0) 110 self.assertRaises(TypeError, pollster.register, Almost(), 0) 111 112 # Another test case for poll(). This is copied from the test case for 113 # select(), modified to use poll() instead. 114 115 def test_poll2(self): 116 cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' 117 proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, 118 bufsize=0) 119 p = proc.stdout 120 pollster = select.poll() 121 pollster.register( p, select.POLLIN ) 122 for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: 123 fdlist = pollster.poll(tout) 124 if (fdlist == []): 125 continue 126 fd, flags = fdlist[0] 127 if flags & select.POLLHUP: 128 line = p.readline() 129 if line != b"": 130 self.fail('error: pipe seems to be closed, but still returns data') 131 continue 132 133 elif flags & select.POLLIN: 134 line = p.readline() 135 if not line: 136 break 137 self.assertEqual(line, b'testing...\n') 138 continue 139 else: 140 self.fail('Unexpected return value from select.poll: %s' % fdlist) 141 p.close() 142 143 def test_poll3(self): 144 # test int overflow 145 pollster = select.poll() 146 pollster.register(1) 147 148 self.assertRaises(OverflowError, pollster.poll, 1 << 64) 149 150 x = 2 + 3 151 if x != 5: 152 self.fail('Overflow must have occurred') 153 154def test_main(): 155 run_unittest(PollTests) 156 157if __name__ == '__main__': 158 test_main() 159