1import socket 2import telnetlib 3import time 4import Queue 5 6import unittest 7from unittest import TestCase 8from test import test_support 9threading = test_support.import_module('threading') 10 11HOST = test_support.HOST 12EOF_sigil = object() 13 14def server(evt, serv, dataq=None): 15 """ Open a tcp server in three steps 16 1) set evt to true to let the parent know we are ready 17 2) [optional] if is not False, write the list of data from dataq.get() 18 to the socket. 19 """ 20 serv.listen(5) 21 evt.set() 22 try: 23 conn, addr = serv.accept() 24 if dataq: 25 data = '' 26 new_data = dataq.get(True, 0.5) 27 dataq.task_done() 28 for item in new_data: 29 if item == EOF_sigil: 30 break 31 if type(item) in [int, float]: 32 time.sleep(item) 33 else: 34 data += item 35 written = conn.send(data) 36 data = data[written:] 37 conn.close() 38 except socket.timeout: 39 pass 40 finally: 41 serv.close() 42 43class GeneralTests(TestCase): 44 45 def setUp(self): 46 self.evt = threading.Event() 47 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 48 self.sock.settimeout(60) # Safety net. Look issue 11812 49 self.port = test_support.bind_port(self.sock) 50 self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) 51 self.thread.setDaemon(True) 52 self.thread.start() 53 self.evt.wait() 54 55 def tearDown(self): 56 self.thread.join() 57 58 def testBasic(self): 59 # connects 60 telnet = telnetlib.Telnet(HOST, self.port) 61 telnet.sock.close() 62 63 def testTimeoutDefault(self): 64 self.assertTrue(socket.getdefaulttimeout() is None) 65 socket.setdefaulttimeout(30) 66 try: 67 telnet = telnetlib.Telnet(HOST, self.port) 68 finally: 69 socket.setdefaulttimeout(None) 70 self.assertEqual(telnet.sock.gettimeout(), 30) 71 telnet.sock.close() 72 73 def testTimeoutNone(self): 74 # None, having other default 75 self.assertTrue(socket.getdefaulttimeout() is None) 76 socket.setdefaulttimeout(30) 77 try: 78 telnet = telnetlib.Telnet(HOST, self.port, timeout=None) 79 finally: 80 socket.setdefaulttimeout(None) 81 self.assertTrue(telnet.sock.gettimeout() is None) 82 telnet.sock.close() 83 84 def testTimeoutValue(self): 85 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 86 self.assertEqual(telnet.sock.gettimeout(), 30) 87 telnet.sock.close() 88 89 def testTimeoutOpen(self): 90 telnet = telnetlib.Telnet() 91 telnet.open(HOST, self.port, timeout=30) 92 self.assertEqual(telnet.sock.gettimeout(), 30) 93 telnet.sock.close() 94 95def _read_setUp(self): 96 self.evt = threading.Event() 97 self.dataq = Queue.Queue() 98 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 99 self.sock.settimeout(10) 100 self.port = test_support.bind_port(self.sock) 101 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) 102 self.thread.start() 103 self.evt.wait() 104 105def _read_tearDown(self): 106 self.thread.join() 107 108class ReadTests(TestCase): 109 setUp = _read_setUp 110 tearDown = _read_tearDown 111 112 # use a similar approach to testing timeouts as test_timeout.py 113 # these will never pass 100% but make the fuzz big enough that it is rare 114 block_long = 0.6 115 block_short = 0.3 116 def test_read_until_A(self): 117 """ 118 read_until(expected, [timeout]) 119 Read until the expected string has been seen, or a timeout is 120 hit (default is no timeout); may block. 121 """ 122 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 123 self.dataq.put(want) 124 telnet = telnetlib.Telnet(HOST, self.port) 125 self.dataq.join() 126 data = telnet.read_until('match') 127 self.assertEqual(data, ''.join(want[:-2])) 128 129 def test_read_until_B(self): 130 # test the timeout - it does NOT raise socket.timeout 131 want = ['hello', self.block_long, 'not seen', EOF_sigil] 132 self.dataq.put(want) 133 telnet = telnetlib.Telnet(HOST, self.port) 134 self.dataq.join() 135 data = telnet.read_until('not seen', self.block_short) 136 self.assertEqual(data, want[0]) 137 self.assertEqual(telnet.read_all(), 'not seen') 138 139 def test_read_until_with_poll(self): 140 """Use select.poll() to implement telnet.read_until().""" 141 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 142 self.dataq.put(want) 143 telnet = telnetlib.Telnet(HOST, self.port) 144 if not telnet._has_poll: 145 raise unittest.SkipTest('select.poll() is required') 146 telnet._has_poll = True 147 self.dataq.join() 148 data = telnet.read_until('match') 149 self.assertEqual(data, ''.join(want[:-2])) 150 151 def test_read_until_with_select(self): 152 """Use select.select() to implement telnet.read_until().""" 153 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 154 self.dataq.put(want) 155 telnet = telnetlib.Telnet(HOST, self.port) 156 telnet._has_poll = False 157 self.dataq.join() 158 data = telnet.read_until('match') 159 self.assertEqual(data, ''.join(want[:-2])) 160 161 def test_read_all_A(self): 162 """ 163 read_all() 164 Read all data until EOF; may block. 165 """ 166 want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil] 167 self.dataq.put(want) 168 telnet = telnetlib.Telnet(HOST, self.port) 169 self.dataq.join() 170 data = telnet.read_all() 171 self.assertEqual(data, ''.join(want[:-1])) 172 return 173 174 def _test_blocking(self, func): 175 self.dataq.put([self.block_long, EOF_sigil]) 176 self.dataq.join() 177 start = time.time() 178 data = func() 179 self.assertTrue(self.block_short <= time.time() - start) 180 181 def test_read_all_B(self): 182 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) 183 184 def test_read_all_C(self): 185 self.dataq.put([EOF_sigil]) 186 telnet = telnetlib.Telnet(HOST, self.port) 187 self.dataq.join() 188 telnet.read_all() 189 telnet.read_all() # shouldn't raise 190 191 def test_read_some_A(self): 192 """ 193 read_some() 194 Read at least one byte or EOF; may block. 195 """ 196 # test 'at least one byte' 197 want = ['x' * 500, EOF_sigil] 198 self.dataq.put(want) 199 telnet = telnetlib.Telnet(HOST, self.port) 200 self.dataq.join() 201 data = telnet.read_all() 202 self.assertTrue(len(data) >= 1) 203 204 def test_read_some_B(self): 205 # test EOF 206 self.dataq.put([EOF_sigil]) 207 telnet = telnetlib.Telnet(HOST, self.port) 208 self.dataq.join() 209 self.assertEqual('', telnet.read_some()) 210 211 def test_read_some_C(self): 212 self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) 213 214 def _test_read_any_eager_A(self, func_name): 215 """ 216 read_very_eager() 217 Read all data available already queued or on the socket, 218 without blocking. 219 """ 220 want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil] 221 expects = want[1] + want[2] 222 self.dataq.put(want) 223 telnet = telnetlib.Telnet(HOST, self.port) 224 self.dataq.join() 225 func = getattr(telnet, func_name) 226 data = '' 227 while True: 228 try: 229 data += func() 230 self.assertTrue(expects.startswith(data)) 231 except EOFError: 232 break 233 self.assertEqual(expects, data) 234 235 def _test_read_any_eager_B(self, func_name): 236 # test EOF 237 self.dataq.put([EOF_sigil]) 238 telnet = telnetlib.Telnet(HOST, self.port) 239 self.dataq.join() 240 time.sleep(self.block_short) 241 func = getattr(telnet, func_name) 242 self.assertRaises(EOFError, func) 243 244 # read_eager and read_very_eager make the same gaurantees 245 # (they behave differently but we only test the gaurantees) 246 def test_read_very_eager_A(self): 247 self._test_read_any_eager_A('read_very_eager') 248 def test_read_very_eager_B(self): 249 self._test_read_any_eager_B('read_very_eager') 250 def test_read_eager_A(self): 251 self._test_read_any_eager_A('read_eager') 252 def test_read_eager_B(self): 253 self._test_read_any_eager_B('read_eager') 254 # NB -- we need to test the IAC block which is mentioned in the docstring 255 # but not in the module docs 256 257 def _test_read_any_lazy_B(self, func_name): 258 self.dataq.put([EOF_sigil]) 259 telnet = telnetlib.Telnet(HOST, self.port) 260 self.dataq.join() 261 func = getattr(telnet, func_name) 262 telnet.fill_rawq() 263 self.assertRaises(EOFError, func) 264 265 def test_read_lazy_A(self): 266 want = ['x' * 100, EOF_sigil] 267 self.dataq.put(want) 268 telnet = telnetlib.Telnet(HOST, self.port) 269 self.dataq.join() 270 time.sleep(self.block_short) 271 self.assertEqual('', telnet.read_lazy()) 272 data = '' 273 while True: 274 try: 275 read_data = telnet.read_lazy() 276 data += read_data 277 if not read_data: 278 telnet.fill_rawq() 279 except EOFError: 280 break 281 self.assertTrue(want[0].startswith(data)) 282 self.assertEqual(data, want[0]) 283 284 def test_read_lazy_B(self): 285 self._test_read_any_lazy_B('read_lazy') 286 287 def test_read_very_lazy_A(self): 288 want = ['x' * 100, EOF_sigil] 289 self.dataq.put(want) 290 telnet = telnetlib.Telnet(HOST, self.port) 291 self.dataq.join() 292 time.sleep(self.block_short) 293 self.assertEqual('', telnet.read_very_lazy()) 294 data = '' 295 while True: 296 try: 297 read_data = telnet.read_very_lazy() 298 except EOFError: 299 break 300 data += read_data 301 if not read_data: 302 telnet.fill_rawq() 303 self.assertEqual('', telnet.cookedq) 304 telnet.process_rawq() 305 self.assertTrue(want[0].startswith(data)) 306 self.assertEqual(data, want[0]) 307 308 def test_read_very_lazy_B(self): 309 self._test_read_any_lazy_B('read_very_lazy') 310 311class nego_collector(object): 312 def __init__(self, sb_getter=None): 313 self.seen = '' 314 self.sb_getter = sb_getter 315 self.sb_seen = '' 316 317 def do_nego(self, sock, cmd, opt): 318 self.seen += cmd + opt 319 if cmd == tl.SE and self.sb_getter: 320 sb_data = self.sb_getter() 321 self.sb_seen += sb_data 322 323tl = telnetlib 324class OptionTests(TestCase): 325 setUp = _read_setUp 326 tearDown = _read_tearDown 327 # RFC 854 commands 328 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] 329 330 def _test_command(self, data): 331 """ helper for testing IAC + cmd """ 332 self.setUp() 333 self.dataq.put(data) 334 telnet = telnetlib.Telnet(HOST, self.port) 335 self.dataq.join() 336 nego = nego_collector() 337 telnet.set_option_negotiation_callback(nego.do_nego) 338 txt = telnet.read_all() 339 cmd = nego.seen 340 self.assertTrue(len(cmd) > 0) # we expect at least one command 341 self.assertIn(cmd[0], self.cmds) 342 self.assertEqual(cmd[1], tl.NOOPT) 343 self.assertEqual(len(''.join(data[:-1])), len(txt + cmd)) 344 nego.sb_getter = None # break the nego => telnet cycle 345 self.tearDown() 346 347 def test_IAC_commands(self): 348 # reset our setup 349 self.dataq.put([EOF_sigil]) 350 telnet = telnetlib.Telnet(HOST, self.port) 351 self.dataq.join() 352 self.tearDown() 353 354 for cmd in self.cmds: 355 self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil]) 356 self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil]) 357 self._test_command([tl.IAC + cmd, EOF_sigil]) 358 # all at once 359 self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) 360 self.assertEqual('', telnet.read_sb_data()) 361 362 def test_SB_commands(self): 363 # RFC 855, subnegotiations portion 364 send = [tl.IAC + tl.SB + tl.IAC + tl.SE, 365 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE, 366 tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE, 367 tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, 368 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE, 369 EOF_sigil, 370 ] 371 self.dataq.put(send) 372 telnet = telnetlib.Telnet(HOST, self.port) 373 self.dataq.join() 374 nego = nego_collector(telnet.read_sb_data) 375 telnet.set_option_negotiation_callback(nego.do_nego) 376 txt = telnet.read_all() 377 self.assertEqual(txt, '') 378 want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd' 379 self.assertEqual(nego.sb_seen, want_sb_data) 380 self.assertEqual('', telnet.read_sb_data()) 381 nego.sb_getter = None # break the nego => telnet cycle 382 383 384class ExpectTests(TestCase): 385 def setUp(self): 386 self.evt = threading.Event() 387 self.dataq = Queue.Queue() 388 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 389 self.sock.settimeout(10) 390 self.port = test_support.bind_port(self.sock) 391 self.thread = threading.Thread(target=server, args=(self.evt,self.sock, 392 self.dataq)) 393 self.thread.start() 394 self.evt.wait() 395 396 def tearDown(self): 397 self.thread.join() 398 399 # use a similar approach to testing timeouts as test_timeout.py 400 # these will never pass 100% but make the fuzz big enough that it is rare 401 block_long = 0.6 402 block_short = 0.3 403 def test_expect_A(self): 404 """ 405 expect(expected, [timeout]) 406 Read until the expected string has been seen, or a timeout is 407 hit (default is no timeout); may block. 408 """ 409 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 410 self.dataq.put(want) 411 telnet = telnetlib.Telnet(HOST, self.port) 412 self.dataq.join() 413 (_,_,data) = telnet.expect(['match']) 414 self.assertEqual(data, ''.join(want[:-2])) 415 416 def test_expect_B(self): 417 # test the timeout - it does NOT raise socket.timeout 418 want = ['hello', self.block_long, 'not seen', EOF_sigil] 419 self.dataq.put(want) 420 telnet = telnetlib.Telnet(HOST, self.port) 421 self.dataq.join() 422 (_,_,data) = telnet.expect(['not seen'], self.block_short) 423 self.assertEqual(data, want[0]) 424 self.assertEqual(telnet.read_all(), 'not seen') 425 426 def test_expect_with_poll(self): 427 """Use select.poll() to implement telnet.expect().""" 428 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 429 self.dataq.put(want) 430 telnet = telnetlib.Telnet(HOST, self.port) 431 if not telnet._has_poll: 432 raise unittest.SkipTest('select.poll() is required') 433 telnet._has_poll = True 434 self.dataq.join() 435 (_,_,data) = telnet.expect(['match']) 436 self.assertEqual(data, ''.join(want[:-2])) 437 438 def test_expect_with_select(self): 439 """Use select.select() to implement telnet.expect().""" 440 want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] 441 self.dataq.put(want) 442 telnet = telnetlib.Telnet(HOST, self.port) 443 telnet._has_poll = False 444 self.dataq.join() 445 (_,_,data) = telnet.expect(['match']) 446 self.assertEqual(data, ''.join(want[:-2])) 447 448 449def test_main(verbose=None): 450 test_support.run_unittest(GeneralTests, ReadTests, OptionTests, 451 ExpectTests) 452 453if __name__ == '__main__': 454 test_main() 455