1# test asynchat
2
3from test import support
4
5# If this fails, the test will be skipped.
6thread = support.import_module('_thread')
7
8import asynchat
9import asyncore
10import errno
11import socket
12import sys
13import time
14import unittest
15import unittest.mock
16try:
17    import threading
18except ImportError:
19    threading = None
20
21HOST = support.HOST
22SERVER_QUIT = b'QUIT\n'
23TIMEOUT = 3.0
24
25if threading:
26    class echo_server(threading.Thread):
27        # parameter to determine the number of bytes passed back to the
28        # client each send
29        chunk_size = 1
30
31        def __init__(self, event):
32            threading.Thread.__init__(self)
33            self.event = event
34            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
35            self.port = support.bind_port(self.sock)
36            # This will be set if the client wants us to wait before echoing
37            # data back.
38            self.start_resend_event = None
39
40        def run(self):
41            self.sock.listen()
42            self.event.set()
43            conn, client = self.sock.accept()
44            self.buffer = b""
45            # collect data until quit message is seen
46            while SERVER_QUIT not in self.buffer:
47                data = conn.recv(1)
48                if not data:
49                    break
50                self.buffer = self.buffer + data
51
52            # remove the SERVER_QUIT message
53            self.buffer = self.buffer.replace(SERVER_QUIT, b'')
54
55            if self.start_resend_event:
56                self.start_resend_event.wait()
57
58            # re-send entire set of collected data
59            try:
60                # this may fail on some tests, such as test_close_when_done,
61                # since the client closes the channel when it's done sending
62                while self.buffer:
63                    n = conn.send(self.buffer[:self.chunk_size])
64                    time.sleep(0.001)
65                    self.buffer = self.buffer[n:]
66            except:
67                pass
68
69            conn.close()
70            self.sock.close()
71
72    class echo_client(asynchat.async_chat):
73
74        def __init__(self, terminator, server_port):
75            asynchat.async_chat.__init__(self)
76            self.contents = []
77            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
78            self.connect((HOST, server_port))
79            self.set_terminator(terminator)
80            self.buffer = b""
81
82            def handle_connect(self):
83                pass
84
85            if sys.platform == 'darwin':
86                # select.poll returns a select.POLLHUP at the end of the tests
87                # on darwin, so just ignore it
88                def handle_expt(self):
89                    pass
90
91        def collect_incoming_data(self, data):
92            self.buffer += data
93
94        def found_terminator(self):
95            self.contents.append(self.buffer)
96            self.buffer = b""
97
98    def start_echo_server():
99        event = threading.Event()
100        s = echo_server(event)
101        s.start()
102        event.wait()
103        event.clear()
104        time.sleep(0.01)   # Give server time to start accepting.
105        return s, event
106
107
108@unittest.skipUnless(threading, 'Threading required for this test.')
109class TestAsynchat(unittest.TestCase):
110    usepoll = False
111
112    def setUp(self):
113        self._threads = support.threading_setup()
114
115    def tearDown(self):
116        support.threading_cleanup(*self._threads)
117
118    def line_terminator_check(self, term, server_chunk):
119        event = threading.Event()
120        s = echo_server(event)
121        s.chunk_size = server_chunk
122        s.start()
123        event.wait()
124        event.clear()
125        time.sleep(0.01)   # Give server time to start accepting.
126        c = echo_client(term, s.port)
127        c.push(b"hello ")
128        c.push(b"world" + term)
129        c.push(b"I'm not dead yet!" + term)
130        c.push(SERVER_QUIT)
131        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
132        s.join(timeout=TIMEOUT)
133        if s.is_alive():
134            self.fail("join() timed out")
135
136        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
137
138    # the line terminator tests below check receiving variously-sized
139    # chunks back from the server in order to exercise all branches of
140    # async_chat.handle_read
141
142    def test_line_terminator1(self):
143        # test one-character terminator
144        for l in (1, 2, 3):
145            self.line_terminator_check(b'\n', l)
146
147    def test_line_terminator2(self):
148        # test two-character terminator
149        for l in (1, 2, 3):
150            self.line_terminator_check(b'\r\n', l)
151
152    def test_line_terminator3(self):
153        # test three-character terminator
154        for l in (1, 2, 3):
155            self.line_terminator_check(b'qqq', l)
156
157    def numeric_terminator_check(self, termlen):
158        # Try reading a fixed number of bytes
159        s, event = start_echo_server()
160        c = echo_client(termlen, s.port)
161        data = b"hello world, I'm not dead yet!\n"
162        c.push(data)
163        c.push(SERVER_QUIT)
164        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
165        s.join(timeout=TIMEOUT)
166        if s.is_alive():
167            self.fail("join() timed out")
168
169        self.assertEqual(c.contents, [data[:termlen]])
170
171    def test_numeric_terminator1(self):
172        # check that ints & longs both work (since type is
173        # explicitly checked in async_chat.handle_read)
174        self.numeric_terminator_check(1)
175
176    def test_numeric_terminator2(self):
177        self.numeric_terminator_check(6)
178
179    def test_none_terminator(self):
180        # Try reading a fixed number of bytes
181        s, event = start_echo_server()
182        c = echo_client(None, s.port)
183        data = b"hello world, I'm not dead yet!\n"
184        c.push(data)
185        c.push(SERVER_QUIT)
186        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
187        s.join(timeout=TIMEOUT)
188        if s.is_alive():
189            self.fail("join() timed out")
190
191        self.assertEqual(c.contents, [])
192        self.assertEqual(c.buffer, data)
193
194    def test_simple_producer(self):
195        s, event = start_echo_server()
196        c = echo_client(b'\n', s.port)
197        data = b"hello world\nI'm not dead yet!\n"
198        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
199        c.push_with_producer(p)
200        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
201        s.join(timeout=TIMEOUT)
202        if s.is_alive():
203            self.fail("join() timed out")
204
205        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
206
207    def test_string_producer(self):
208        s, event = start_echo_server()
209        c = echo_client(b'\n', s.port)
210        data = b"hello world\nI'm not dead yet!\n"
211        c.push_with_producer(data+SERVER_QUIT)
212        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
213        s.join(timeout=TIMEOUT)
214        if s.is_alive():
215            self.fail("join() timed out")
216
217        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
218
219    def test_empty_line(self):
220        # checks that empty lines are handled correctly
221        s, event = start_echo_server()
222        c = echo_client(b'\n', s.port)
223        c.push(b"hello world\n\nI'm not dead yet!\n")
224        c.push(SERVER_QUIT)
225        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
226        s.join(timeout=TIMEOUT)
227        if s.is_alive():
228            self.fail("join() timed out")
229
230        self.assertEqual(c.contents,
231                         [b"hello world", b"", b"I'm not dead yet!"])
232
233    def test_close_when_done(self):
234        s, event = start_echo_server()
235        s.start_resend_event = threading.Event()
236        c = echo_client(b'\n', s.port)
237        c.push(b"hello world\nI'm not dead yet!\n")
238        c.push(SERVER_QUIT)
239        c.close_when_done()
240        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
241
242        # Only allow the server to start echoing data back to the client after
243        # the client has closed its connection.  This prevents a race condition
244        # where the server echoes all of its data before we can check that it
245        # got any down below.
246        s.start_resend_event.set()
247        s.join(timeout=TIMEOUT)
248        if s.is_alive():
249            self.fail("join() timed out")
250
251        self.assertEqual(c.contents, [])
252        # the server might have been able to send a byte or two back, but this
253        # at least checks that it received something and didn't just fail
254        # (which could still result in the client not having received anything)
255        self.assertGreater(len(s.buffer), 0)
256
257    def test_push(self):
258        # Issue #12523: push() should raise a TypeError if it doesn't get
259        # a bytes string
260        s, event = start_echo_server()
261        c = echo_client(b'\n', s.port)
262        data = b'bytes\n'
263        c.push(data)
264        c.push(bytearray(data))
265        c.push(memoryview(data))
266        self.assertRaises(TypeError, c.push, 10)
267        self.assertRaises(TypeError, c.push, 'unicode')
268        c.push(SERVER_QUIT)
269        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
270        s.join(timeout=TIMEOUT)
271        self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])
272
273
274class TestAsynchat_WithPoll(TestAsynchat):
275    usepoll = True
276
277
278class TestAsynchatMocked(unittest.TestCase):
279    def test_blockingioerror(self):
280        # Issue #16133: handle_read() must ignore BlockingIOError
281        sock = unittest.mock.Mock()
282        sock.recv.side_effect = BlockingIOError(errno.EAGAIN)
283
284        dispatcher = asynchat.async_chat()
285        dispatcher.set_socket(sock)
286        self.addCleanup(dispatcher.del_channel)
287
288        with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
289            dispatcher.handle_read()
290        self.assertFalse(error.called)
291
292
293class TestHelperFunctions(unittest.TestCase):
294    def test_find_prefix_at_end(self):
295        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
296        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
297
298
299class TestNotConnected(unittest.TestCase):
300    def test_disallow_negative_terminator(self):
301        # Issue #11259
302        client = asynchat.async_chat()
303        self.assertRaises(ValueError, client.set_terminator, -1)
304
305
306
307if __name__ == "__main__":
308    unittest.main()
309