test_asynchat.py revision 643e85df48c87f082b16ca9e4ac02856130fbdb3
1# test asynchat 2 3import asyncore, asynchat, socket, threading, time 4import unittest 5import sys 6from test import test_support 7 8# Skip tests if thread module does not exist. 9test_support.import_module('thread') 10 11HOST = test_support.HOST 12SERVER_QUIT = 'QUIT\n' 13 14class echo_server(threading.Thread): 15 # parameter to determine the number of bytes passed back to the 16 # client each send 17 chunk_size = 1 18 19 def __init__(self, event): 20 threading.Thread.__init__(self) 21 self.event = event 22 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 23 self.port = test_support.bind_port(self.sock) 24 25 def run(self): 26 self.sock.listen(1) 27 self.event.set() 28 conn, client = self.sock.accept() 29 self.buffer = "" 30 # collect data until quit message is seen 31 while SERVER_QUIT not in self.buffer: 32 data = conn.recv(1) 33 if not data: 34 break 35 self.buffer = self.buffer + data 36 37 # remove the SERVER_QUIT message 38 self.buffer = self.buffer.replace(SERVER_QUIT, '') 39 40 # re-send entire set of collected data 41 try: 42 # this may fail on some tests, such as test_close_when_done, since 43 # the client closes the channel when it's done sending 44 while self.buffer: 45 n = conn.send(self.buffer[:self.chunk_size]) 46 time.sleep(0.001) 47 self.buffer = self.buffer[n:] 48 except: 49 pass 50 51 conn.close() 52 self.sock.close() 53 54class echo_client(asynchat.async_chat): 55 56 def __init__(self, terminator, server_port): 57 asynchat.async_chat.__init__(self) 58 self.contents = [] 59 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 60 self.connect((HOST, server_port)) 61 self.set_terminator(terminator) 62 self.buffer = '' 63 64 def handle_connect(self): 65 pass 66 67 if sys.platform == 'darwin': 68 # select.poll returns a select.POLLHUP at the end of the tests 69 # on darwin, so just ignore it 70 def handle_expt(self): 71 pass 72 73 def collect_incoming_data(self, data): 74 self.buffer += data 75 76 def found_terminator(self): 77 self.contents.append(self.buffer) 78 self.buffer = "" 79 80 81def start_echo_server(): 82 event = threading.Event() 83 s = echo_server(event) 84 s.start() 85 event.wait() 86 event.clear() 87 time.sleep(0.01) # Give server time to start accepting. 88 return s, event 89 90 91class TestAsynchat(unittest.TestCase): 92 usepoll = False 93 94 def setUp (self): 95 self._threads = test_support.threading_setup() 96 97 def tearDown (self): 98 test_support.threading_cleanup(*self._threads) 99 100 def line_terminator_check(self, term, server_chunk): 101 event = threading.Event() 102 s = echo_server(event) 103 s.chunk_size = server_chunk 104 s.start() 105 event.wait() 106 event.clear() 107 time.sleep(0.01) # Give server time to start accepting. 108 c = echo_client(term, s.port) 109 c.push("hello ") 110 c.push("world%s" % term) 111 c.push("I'm not dead yet!%s" % term) 112 c.push(SERVER_QUIT) 113 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 114 s.join() 115 116 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) 117 118 # the line terminator tests below check receiving variously-sized 119 # chunks back from the server in order to exercise all branches of 120 # async_chat.handle_read 121 122 def test_line_terminator1(self): 123 # test one-character terminator 124 for l in (1,2,3): 125 self.line_terminator_check('\n', l) 126 127 def test_line_terminator2(self): 128 # test two-character terminator 129 for l in (1,2,3): 130 self.line_terminator_check('\r\n', l) 131 132 def test_line_terminator3(self): 133 # test three-character terminator 134 for l in (1,2,3): 135 self.line_terminator_check('qqq', l) 136 137 def numeric_terminator_check(self, termlen): 138 # Try reading a fixed number of bytes 139 s, event = start_echo_server() 140 c = echo_client(termlen, s.port) 141 data = "hello world, I'm not dead yet!\n" 142 c.push(data) 143 c.push(SERVER_QUIT) 144 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 145 s.join() 146 147 self.assertEqual(c.contents, [data[:termlen]]) 148 149 def test_numeric_terminator1(self): 150 # check that ints & longs both work (since type is 151 # explicitly checked in async_chat.handle_read) 152 self.numeric_terminator_check(1) 153 self.numeric_terminator_check(1L) 154 155 def test_numeric_terminator2(self): 156 self.numeric_terminator_check(6L) 157 158 def test_none_terminator(self): 159 # Try reading a fixed number of bytes 160 s, event = start_echo_server() 161 c = echo_client(None, s.port) 162 data = "hello world, I'm not dead yet!\n" 163 c.push(data) 164 c.push(SERVER_QUIT) 165 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 166 s.join() 167 168 self.assertEqual(c.contents, []) 169 self.assertEqual(c.buffer, data) 170 171 def test_simple_producer(self): 172 s, event = start_echo_server() 173 c = echo_client('\n', s.port) 174 data = "hello world\nI'm not dead yet!\n" 175 p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) 176 c.push_with_producer(p) 177 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 178 s.join() 179 180 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) 181 182 def test_string_producer(self): 183 s, event = start_echo_server() 184 c = echo_client('\n', s.port) 185 data = "hello world\nI'm not dead yet!\n" 186 c.push_with_producer(data+SERVER_QUIT) 187 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 188 s.join() 189 190 self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"]) 191 192 def test_empty_line(self): 193 # checks that empty lines are handled correctly 194 s, event = start_echo_server() 195 c = echo_client('\n', s.port) 196 c.push("hello world\n\nI'm not dead yet!\n") 197 c.push(SERVER_QUIT) 198 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 199 s.join() 200 201 self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"]) 202 203 def test_close_when_done(self): 204 s, event = start_echo_server() 205 c = echo_client('\n', s.port) 206 c.push("hello world\nI'm not dead yet!\n") 207 c.push(SERVER_QUIT) 208 c.close_when_done() 209 asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) 210 s.join() 211 212 self.assertEqual(c.contents, []) 213 # the server might have been able to send a byte or two back, but this 214 # at least checks that it received something and didn't just fail 215 # (which could still result in the client not having received anything) 216 self.assertTrue(len(s.buffer) > 0) 217 218 219class TestAsynchat_WithPoll(TestAsynchat): 220 usepoll = True 221 222class TestHelperFunctions(unittest.TestCase): 223 def test_find_prefix_at_end(self): 224 self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1) 225 self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0) 226 227class TestFifo(unittest.TestCase): 228 def test_basic(self): 229 f = asynchat.fifo() 230 f.push(7) 231 f.push('a') 232 self.assertEqual(len(f), 2) 233 self.assertEqual(f.first(), 7) 234 self.assertEqual(f.pop(), (1, 7)) 235 self.assertEqual(len(f), 1) 236 self.assertEqual(f.first(), 'a') 237 self.assertEqual(f.is_empty(), False) 238 self.assertEqual(f.pop(), (1, 'a')) 239 self.assertEqual(len(f), 0) 240 self.assertEqual(f.is_empty(), True) 241 self.assertEqual(f.pop(), (0, None)) 242 243 def test_given_list(self): 244 f = asynchat.fifo(['x', 17, 3]) 245 self.assertEqual(len(f), 3) 246 self.assertEqual(f.pop(), (1, 'x')) 247 self.assertEqual(f.pop(), (1, 17)) 248 self.assertEqual(f.pop(), (1, 3)) 249 self.assertEqual(f.pop(), (0, None)) 250 251 252def test_main(verbose=None): 253 test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll, 254 TestHelperFunctions, TestFifo) 255 256if __name__ == "__main__": 257 test_main(verbose=True) 258