test_asynchat.py revision 643e85df48c87f082b16ca9e4ac02856130fbdb3
1# test asynchat
2
3import asyncore, asynchat, socket, threading, time
4import unittest
5import sys
6from test import test_support
7
8# Skip tests if thread module does not exist.
9test_support.import_module('thread')
10
11HOST = test_support.HOST
12SERVER_QUIT = 'QUIT\n'
13
14class echo_server(threading.Thread):
15    # parameter to determine the number of bytes passed back to the
16    # client each send
17    chunk_size = 1
18
19    def __init__(self, event):
20        threading.Thread.__init__(self)
21        self.event = event
22        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
23        self.port = test_support.bind_port(self.sock)
24
25    def run(self):
26        self.sock.listen(1)
27        self.event.set()
28        conn, client = self.sock.accept()
29        self.buffer = ""
30        # collect data until quit message is seen
31        while SERVER_QUIT not in self.buffer:
32            data = conn.recv(1)
33            if not data:
34                break
35            self.buffer = self.buffer + data
36
37        # remove the SERVER_QUIT message
38        self.buffer = self.buffer.replace(SERVER_QUIT, '')
39
40        # re-send entire set of collected data
41        try:
42            # this may fail on some tests, such as test_close_when_done, since
43            # the client closes the channel when it's done sending
44            while self.buffer:
45                n = conn.send(self.buffer[:self.chunk_size])
46                time.sleep(0.001)
47                self.buffer = self.buffer[n:]
48        except:
49            pass
50
51        conn.close()
52        self.sock.close()
53
54class echo_client(asynchat.async_chat):
55
56    def __init__(self, terminator, server_port):
57        asynchat.async_chat.__init__(self)
58        self.contents = []
59        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
60        self.connect((HOST, server_port))
61        self.set_terminator(terminator)
62        self.buffer = ''
63
64    def handle_connect(self):
65        pass
66
67    if sys.platform == 'darwin':
68        # select.poll returns a select.POLLHUP at the end of the tests
69        # on darwin, so just ignore it
70        def handle_expt(self):
71            pass
72
73    def collect_incoming_data(self, data):
74        self.buffer += data
75
76    def found_terminator(self):
77        self.contents.append(self.buffer)
78        self.buffer = ""
79
80
81def start_echo_server():
82    event = threading.Event()
83    s = echo_server(event)
84    s.start()
85    event.wait()
86    event.clear()
87    time.sleep(0.01) # Give server time to start accepting.
88    return s, event
89
90
91class TestAsynchat(unittest.TestCase):
92    usepoll = False
93
94    def setUp (self):
95        self._threads = test_support.threading_setup()
96
97    def tearDown (self):
98        test_support.threading_cleanup(*self._threads)
99
100    def line_terminator_check(self, term, server_chunk):
101        event = threading.Event()
102        s = echo_server(event)
103        s.chunk_size = server_chunk
104        s.start()
105        event.wait()
106        event.clear()
107        time.sleep(0.01) # Give server time to start accepting.
108        c = echo_client(term, s.port)
109        c.push("hello ")
110        c.push("world%s" % term)
111        c.push("I'm not dead yet!%s" % term)
112        c.push(SERVER_QUIT)
113        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
114        s.join()
115
116        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
117
118    # the line terminator tests below check receiving variously-sized
119    # chunks back from the server in order to exercise all branches of
120    # async_chat.handle_read
121
122    def test_line_terminator1(self):
123        # test one-character terminator
124        for l in (1,2,3):
125            self.line_terminator_check('\n', l)
126
127    def test_line_terminator2(self):
128        # test two-character terminator
129        for l in (1,2,3):
130            self.line_terminator_check('\r\n', l)
131
132    def test_line_terminator3(self):
133        # test three-character terminator
134        for l in (1,2,3):
135            self.line_terminator_check('qqq', l)
136
137    def numeric_terminator_check(self, termlen):
138        # Try reading a fixed number of bytes
139        s, event = start_echo_server()
140        c = echo_client(termlen, s.port)
141        data = "hello world, I'm not dead yet!\n"
142        c.push(data)
143        c.push(SERVER_QUIT)
144        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
145        s.join()
146
147        self.assertEqual(c.contents, [data[:termlen]])
148
149    def test_numeric_terminator1(self):
150        # check that ints & longs both work (since type is
151        # explicitly checked in async_chat.handle_read)
152        self.numeric_terminator_check(1)
153        self.numeric_terminator_check(1L)
154
155    def test_numeric_terminator2(self):
156        self.numeric_terminator_check(6L)
157
158    def test_none_terminator(self):
159        # Try reading a fixed number of bytes
160        s, event = start_echo_server()
161        c = echo_client(None, s.port)
162        data = "hello world, I'm not dead yet!\n"
163        c.push(data)
164        c.push(SERVER_QUIT)
165        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
166        s.join()
167
168        self.assertEqual(c.contents, [])
169        self.assertEqual(c.buffer, data)
170
171    def test_simple_producer(self):
172        s, event = start_echo_server()
173        c = echo_client('\n', s.port)
174        data = "hello world\nI'm not dead yet!\n"
175        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
176        c.push_with_producer(p)
177        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
178        s.join()
179
180        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
181
182    def test_string_producer(self):
183        s, event = start_echo_server()
184        c = echo_client('\n', s.port)
185        data = "hello world\nI'm not dead yet!\n"
186        c.push_with_producer(data+SERVER_QUIT)
187        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
188        s.join()
189
190        self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
191
192    def test_empty_line(self):
193        # checks that empty lines are handled correctly
194        s, event = start_echo_server()
195        c = echo_client('\n', s.port)
196        c.push("hello world\n\nI'm not dead yet!\n")
197        c.push(SERVER_QUIT)
198        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
199        s.join()
200
201        self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
202
203    def test_close_when_done(self):
204        s, event = start_echo_server()
205        c = echo_client('\n', s.port)
206        c.push("hello world\nI'm not dead yet!\n")
207        c.push(SERVER_QUIT)
208        c.close_when_done()
209        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
210        s.join()
211
212        self.assertEqual(c.contents, [])
213        # the server might have been able to send a byte or two back, but this
214        # at least checks that it received something and didn't just fail
215        # (which could still result in the client not having received anything)
216        self.assertTrue(len(s.buffer) > 0)
217
218
219class TestAsynchat_WithPoll(TestAsynchat):
220    usepoll = True
221
222class TestHelperFunctions(unittest.TestCase):
223    def test_find_prefix_at_end(self):
224        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
225        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
226
227class TestFifo(unittest.TestCase):
228    def test_basic(self):
229        f = asynchat.fifo()
230        f.push(7)
231        f.push('a')
232        self.assertEqual(len(f), 2)
233        self.assertEqual(f.first(), 7)
234        self.assertEqual(f.pop(), (1, 7))
235        self.assertEqual(len(f), 1)
236        self.assertEqual(f.first(), 'a')
237        self.assertEqual(f.is_empty(), False)
238        self.assertEqual(f.pop(), (1, 'a'))
239        self.assertEqual(len(f), 0)
240        self.assertEqual(f.is_empty(), True)
241        self.assertEqual(f.pop(), (0, None))
242
243    def test_given_list(self):
244        f = asynchat.fifo(['x', 17, 3])
245        self.assertEqual(len(f), 3)
246        self.assertEqual(f.pop(), (1, 'x'))
247        self.assertEqual(f.pop(), (1, 17))
248        self.assertEqual(f.pop(), (1, 3))
249        self.assertEqual(f.pop(), (0, None))
250
251
252def test_main(verbose=None):
253    test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
254                              TestHelperFunctions, TestFifo)
255
256if __name__ == "__main__":
257    test_main(verbose=True)
258