test_asynchat.py revision 6e070814b232d663e222dec7252d8d187d7195ab
1# test asynchat -- requires threading 2 3import thread # If this fails, we can't test this module 4import asyncore, asynchat, socket, threading, time 5import unittest 6import sys 7from test import test_support 8 9HOST = "127.0.0.1" 10PORT = 54322 11SERVER_QUIT = 'QUIT\n' 12 13class echo_server(threading.Thread): 14 # parameter to determine the number of bytes passed back to the 15 # client each send 16 chunk_size = 1 17 18 def __init__(self, event): 19 threading.Thread.__init__(self) 20 self.event = event 21 22 def run(self): 23 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 24 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 25 global PORT 26 PORT = test_support.bind_port(sock, HOST, PORT) 27 sock.listen(1) 28 self.event.set() 29 conn, client = sock.accept() 30 self.buffer = "" 31 # collect data until quit message is seen 32 while SERVER_QUIT not in self.buffer: 33 data = conn.recv(1) 34 if not data: 35 break 36 self.buffer = self.buffer + data 37 38 # remove the SERVER_QUIT message 39 self.buffer = self.buffer.replace(SERVER_QUIT, '') 40 41 # re-send entire set of collected data 42 try: 43 # this may fail on some tests, such as test_close_when_done, since 44 # the client closes the channel when it's done sending 45 while self.buffer: 46 n = conn.send(self.buffer[:self.chunk_size]) 47 time.sleep(0.001) 48 self.buffer = self.buffer[n:] 49 except: 50 pass 51 52 conn.close() 53 sock.close() 54 55class echo_client(asynchat.async_chat): 56 57 def __init__(self, terminator): 58 asynchat.async_chat.__init__(self) 59 self.contents = [] 60 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 61 self.connect((HOST, PORT)) 62 self.set_terminator(terminator) 63 self.buffer = '' 64 65 def handle_connect(self): 66 pass 67 68 if sys.platform == 'darwin': 69 # select.poll returns a select.POLLHUP at the end of the tests 70 # on darwin, so just ignore it 71 def handle_expt(self): 72 pass 73 74 def collect_incoming_data(self, data): 75 self.buffer += data 76 77 def found_terminator(self): 78 self.contents.append(self.buffer) 79 self.buffer = "" 80 81 82def start_echo_server(): 83 event = threading.Event() 84 s = echo_server(event) 85 s.start() 86 event.wait() 87 event.clear() 88 time.sleep(0.01) # Give server time to start accepting. 89 return s, event 90 91 92class TestAsynchat(unittest.TestCase): 93 usepoll = False 94 95 def setUp (self): 96 pass 97 98 def tearDown (self): 99 pass 100 101 def line_terminator_check(self, term, server_chunk): 102 event = threading.Event() 103 s = echo_server(event) 104 s.chunk_size = server_chunk 105 s.start() 106 event.wait() 107 event.clear() 108 time.sleep(0.01) # Give server time to start accepting. 109 c = echo_client(term) 110 c.push("hello ") 111 c.push("world%s" % term) 112 c.push("I'm not dead yet!%s" % term) 113 c.push(SERVER_QUIT) 114 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 115 s.join() 116 117 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) 118 119 # the line terminator tests below check receiving variously-sized 120 # chunks back from the server in order to exercise all branches of 121 # async_chat.handle_read 122 123 def test_line_terminator1(self): 124 # test one-character terminator 125 for l in (1,2,3): 126 self.line_terminator_check('\n', l) 127 128 def test_line_terminator2(self): 129 # test two-character terminator 130 for l in (1,2,3): 131 self.line_terminator_check('\r\n', l) 132 133 def test_line_terminator3(self): 134 # test three-character terminator 135 for l in (1,2,3): 136 self.line_terminator_check('qqq', l) 137 138 def numeric_terminator_check(self, termlen): 139 # Try reading a fixed number of bytes 140 s, event = start_echo_server() 141 c = echo_client(termlen) 142 data = "hello world, I'm not dead yet!\n" 143 c.push(data) 144 c.push(SERVER_QUIT) 145 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 146 s.join() 147 148 self.assertEqual(c.contents, [data[:termlen]]) 149 150 def test_numeric_terminator1(self): 151 # check that ints & longs both work (since type is 152 # explicitly checked in async_chat.handle_read) 153 self.numeric_terminator_check(1) 154 self.numeric_terminator_check(1L) 155 156 def test_numeric_terminator2(self): 157 self.numeric_terminator_check(6L) 158 159 def test_none_terminator(self): 160 # Try reading a fixed number of bytes 161 s, event = start_echo_server() 162 c = echo_client(None) 163 data = "hello world, I'm not dead yet!\n" 164 c.push(data) 165 c.push(SERVER_QUIT) 166 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 167 s.join() 168 169 self.assertEqual(c.contents, []) 170 self.assertEqual(c.buffer, data) 171 172 def test_simple_producer(self): 173 s, event = start_echo_server() 174 c = echo_client('\n') 175 data = "hello world\nI'm not dead yet!\n" 176 p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) 177 c.push_with_producer(p) 178 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 179 s.join() 180 181 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) 182 183 def test_string_producer(self): 184 s, event = start_echo_server() 185 c = echo_client('\n') 186 data = "hello world\nI'm not dead yet!\n" 187 c.push_with_producer(data+SERVER_QUIT) 188 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 189 s.join() 190 191 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) 192 193 def test_empty_line(self): 194 # checks that empty lines are handled correctly 195 s, event = start_echo_server() 196 c = echo_client('\n') 197 c.push("hello world\n\nI'm not dead yet!\n") 198 c.push(SERVER_QUIT) 199 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 200 s.join() 201 202 self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"]) 203 204 def test_close_when_done(self): 205 s, event = start_echo_server() 206 c = echo_client('\n') 207 c.push("hello world\nI'm not dead yet!\n") 208 c.push(SERVER_QUIT) 209 c.close_when_done() 210 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 211 s.join() 212 213 self.assertEqual(c.contents, []) 214 # the server might have been able to send a byte or two back, but this 215 # at least checks that it received something and didn't just fail 216 # (which could still result in the client not having received anything) 217 self.assertTrue(len(s.buffer) > 0) 218 219 220class TestAsynchat_WithPoll(TestAsynchat): 221 usepoll = True 222 223class TestHelperFunctions(unittest.TestCase): 224 def test_find_prefix_at_end(self): 225 self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) 226 self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) 227 228class TestFifo(unittest.TestCase): 229 def test_basic(self): 230 f = asynchat.fifo() 231 f.push(7) 232 f.push('a') 233 self.assertEqual(len(f), 2) 234 self.assertEqual(f.first(), 7) 235 self.assertEqual(f.pop(), (1, 7)) 236 self.assertEqual(len(f), 1) 237 self.assertEqual(f.first(), 'a') 238 self.assertEqual(f.is_empty(), False) 239 self.assertEqual(f.pop(), (1, 'a')) 240 self.assertEqual(len(f), 0) 241 self.assertEqual(f.is_empty(), True) 242 self.assertEqual(f.pop(), (0, None)) 243 244 def test_given_list(self): 245 f = asynchat.fifo(['x', 17, 3]) 246 self.assertEqual(len(f), 3) 247 self.assertEqual(f.pop(), (1, 'x')) 248 self.assertEqual(f.pop(), (1, 17)) 249 self.assertEqual(f.pop(), (1, 3)) 250 self.assertEqual(f.pop(), (0, None)) 251 252 253def test_main(verbose=None): 254 test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll, 255 TestHelperFunctions, TestFifo) 256 257if __name__ == "__main__": 258 test_main(verbose=True) 259