1import base64 2import datetime 3import decimal 4import sys 5import time 6import unittest 7from unittest import mock 8import xmlrpc.client as xmlrpclib 9import xmlrpc.server 10import http.client 11import http, http.server 12import socket 13import re 14import io 15import contextlib 16from test import support 17 18try: 19 import gzip 20except ImportError: 21 gzip = None 22try: 23 import threading 24except ImportError: 25 threading = None 26 27alist = [{'astring': 'foo@bar.baz.spam', 28 'afloat': 7283.43, 29 'anint': 2**20, 30 'ashortlong': 2, 31 'anotherlist': ['.zyx.41'], 32 'abase64': xmlrpclib.Binary(b"my dog has fleas"), 33 'b64bytes': b"my dog has fleas", 34 'b64bytearray': bytearray(b"my dog has fleas"), 35 'boolean': False, 36 'unicode': '\u4000\u6000\u8000', 37 'ukey\u4000': 'regular value', 38 'datetime1': xmlrpclib.DateTime('20050210T11:41:23'), 39 'datetime2': xmlrpclib.DateTime( 40 (2005, 2, 10, 11, 41, 23, 0, 1, -1)), 41 'datetime3': xmlrpclib.DateTime( 42 datetime.datetime(2005, 2, 10, 11, 41, 23)), 43 }] 44 45class XMLRPCTestCase(unittest.TestCase): 46 47 def test_dump_load(self): 48 dump = xmlrpclib.dumps((alist,)) 49 load = xmlrpclib.loads(dump) 50 self.assertEqual(alist, load[0][0]) 51 52 def test_dump_bare_datetime(self): 53 # This checks that an unwrapped datetime.date object can be handled 54 # by the marshalling code. This can't be done via test_dump_load() 55 # since with use_builtin_types set to 1 the unmarshaller would create 56 # datetime objects for the 'datetime[123]' keys as well 57 dt = datetime.datetime(2005, 2, 10, 11, 41, 23) 58 self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23')) 59 s = xmlrpclib.dumps((dt,)) 60 61 result, m = xmlrpclib.loads(s, use_builtin_types=True) 62 (newdt,) = result 63 self.assertEqual(newdt, dt) 64 self.assertIs(type(newdt), datetime.datetime) 65 self.assertIsNone(m) 66 67 result, m = xmlrpclib.loads(s, use_builtin_types=False) 68 (newdt,) = result 69 self.assertEqual(newdt, dt) 70 self.assertIs(type(newdt), xmlrpclib.DateTime) 71 self.assertIsNone(m) 72 73 result, m = xmlrpclib.loads(s, use_datetime=True) 74 (newdt,) = result 75 self.assertEqual(newdt, dt) 76 self.assertIs(type(newdt), datetime.datetime) 77 self.assertIsNone(m) 78 79 result, m = xmlrpclib.loads(s, use_datetime=False) 80 (newdt,) = result 81 self.assertEqual(newdt, dt) 82 self.assertIs(type(newdt), xmlrpclib.DateTime) 83 self.assertIsNone(m) 84 85 86 def test_datetime_before_1900(self): 87 # same as before but with a date before 1900 88 dt = datetime.datetime(1, 2, 10, 11, 41, 23) 89 self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23')) 90 s = xmlrpclib.dumps((dt,)) 91 92 result, m = xmlrpclib.loads(s, use_builtin_types=True) 93 (newdt,) = result 94 self.assertEqual(newdt, dt) 95 self.assertIs(type(newdt), datetime.datetime) 96 self.assertIsNone(m) 97 98 result, m = xmlrpclib.loads(s, use_builtin_types=False) 99 (newdt,) = result 100 self.assertEqual(newdt, dt) 101 self.assertIs(type(newdt), xmlrpclib.DateTime) 102 self.assertIsNone(m) 103 104 def test_bug_1164912 (self): 105 d = xmlrpclib.DateTime() 106 ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,), 107 methodresponse=True)) 108 self.assertIsInstance(new_d.value, str) 109 110 # Check that the output of dumps() is still an 8-bit string 111 s = xmlrpclib.dumps((new_d,), methodresponse=True) 112 self.assertIsInstance(s, str) 113 114 def test_newstyle_class(self): 115 class T(object): 116 pass 117 t = T() 118 t.x = 100 119 t.y = "Hello" 120 ((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,))) 121 self.assertEqual(t2, t.__dict__) 122 123 def test_dump_big_long(self): 124 self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,)) 125 126 def test_dump_bad_dict(self): 127 self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},)) 128 129 def test_dump_recursive_seq(self): 130 l = [1,2,3] 131 t = [3,4,5,l] 132 l.append(t) 133 self.assertRaises(TypeError, xmlrpclib.dumps, (l,)) 134 135 def test_dump_recursive_dict(self): 136 d = {'1':1, '2':1} 137 t = {'3':3, 'd':d} 138 d['t'] = t 139 self.assertRaises(TypeError, xmlrpclib.dumps, (d,)) 140 141 def test_dump_big_int(self): 142 if sys.maxsize > 2**31-1: 143 self.assertRaises(OverflowError, xmlrpclib.dumps, 144 (int(2**34),)) 145 146 xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT)) 147 self.assertRaises(OverflowError, xmlrpclib.dumps, 148 (xmlrpclib.MAXINT+1,)) 149 self.assertRaises(OverflowError, xmlrpclib.dumps, 150 (xmlrpclib.MININT-1,)) 151 152 def dummy_write(s): 153 pass 154 155 m = xmlrpclib.Marshaller() 156 m.dump_int(xmlrpclib.MAXINT, dummy_write) 157 m.dump_int(xmlrpclib.MININT, dummy_write) 158 self.assertRaises(OverflowError, m.dump_int, 159 xmlrpclib.MAXINT+1, dummy_write) 160 self.assertRaises(OverflowError, m.dump_int, 161 xmlrpclib.MININT-1, dummy_write) 162 163 def test_dump_double(self): 164 xmlrpclib.dumps((float(2 ** 34),)) 165 xmlrpclib.dumps((float(xmlrpclib.MAXINT), 166 float(xmlrpclib.MININT))) 167 xmlrpclib.dumps((float(xmlrpclib.MAXINT + 42), 168 float(xmlrpclib.MININT - 42))) 169 170 def dummy_write(s): 171 pass 172 173 m = xmlrpclib.Marshaller() 174 m.dump_double(xmlrpclib.MAXINT, dummy_write) 175 m.dump_double(xmlrpclib.MININT, dummy_write) 176 m.dump_double(xmlrpclib.MAXINT + 42, dummy_write) 177 m.dump_double(xmlrpclib.MININT - 42, dummy_write) 178 179 def test_dump_none(self): 180 value = alist + [None] 181 arg1 = (alist + [None],) 182 strg = xmlrpclib.dumps(arg1, allow_none=True) 183 self.assertEqual(value, 184 xmlrpclib.loads(strg)[0][0]) 185 self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,)) 186 187 def test_dump_encoding(self): 188 value = {'key\u20ac\xa4': 189 'value\u20ac\xa4'} 190 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15') 191 strg = "<?xml version='1.0' encoding='iso-8859-15'?>" + strg 192 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 193 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 194 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 195 196 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 197 methodresponse=True) 198 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 199 strg = strg.encode('iso-8859-15', 'xmlcharrefreplace') 200 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 201 202 methodname = 'method\u20ac\xa4' 203 strg = xmlrpclib.dumps((value,), encoding='iso-8859-15', 204 methodname=methodname) 205 self.assertEqual(xmlrpclib.loads(strg)[0][0], value) 206 self.assertEqual(xmlrpclib.loads(strg)[1], methodname) 207 208 def test_dump_bytes(self): 209 sample = b"my dog has fleas" 210 self.assertEqual(sample, xmlrpclib.Binary(sample)) 211 for type_ in bytes, bytearray, xmlrpclib.Binary: 212 value = type_(sample) 213 s = xmlrpclib.dumps((value,)) 214 215 result, m = xmlrpclib.loads(s, use_builtin_types=True) 216 (newvalue,) = result 217 self.assertEqual(newvalue, sample) 218 self.assertIs(type(newvalue), bytes) 219 self.assertIsNone(m) 220 221 result, m = xmlrpclib.loads(s, use_builtin_types=False) 222 (newvalue,) = result 223 self.assertEqual(newvalue, sample) 224 self.assertIs(type(newvalue), xmlrpclib.Binary) 225 self.assertIsNone(m) 226 227 def test_loads_unsupported(self): 228 ResponseError = xmlrpclib.ResponseError 229 data = '<params><param><value><spam/></value></param></params>' 230 self.assertRaises(ResponseError, xmlrpclib.loads, data) 231 data = ('<params><param><value><array>' 232 '<value><spam/></value>' 233 '</array></value></param></params>') 234 self.assertRaises(ResponseError, xmlrpclib.loads, data) 235 data = ('<params><param><value><struct>' 236 '<member><name>a</name><value><spam/></value></member>' 237 '<member><name>b</name><value><spam/></value></member>' 238 '</struct></value></param></params>') 239 self.assertRaises(ResponseError, xmlrpclib.loads, data) 240 241 def check_loads(self, s, value, **kwargs): 242 dump = '<params><param><value>%s</value></param></params>' % s 243 result, m = xmlrpclib.loads(dump, **kwargs) 244 (newvalue,) = result 245 self.assertEqual(newvalue, value) 246 self.assertIs(type(newvalue), type(value)) 247 self.assertIsNone(m) 248 249 def test_load_standard_types(self): 250 check = self.check_loads 251 check('string', 'string') 252 check('<string>string</string>', 'string') 253 check('<string> string</string>', ' string') 254 check('<int>2056183947</int>', 2056183947) 255 check('<int>-2056183947</int>', -2056183947) 256 check('<i4>2056183947</i4>', 2056183947) 257 check('<double>46093.78125</double>', 46093.78125) 258 check('<boolean>0</boolean>', False) 259 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 260 xmlrpclib.Binary(b'\x00byte string\xff')) 261 check('<base64>AGJ5dGUgc3RyaW5n/w==</base64>', 262 b'\x00byte string\xff', use_builtin_types=True) 263 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 264 xmlrpclib.DateTime('20050210T11:41:23')) 265 check('<dateTime.iso8601>20050210T11:41:23</dateTime.iso8601>', 266 datetime.datetime(2005, 2, 10, 11, 41, 23), 267 use_builtin_types=True) 268 check('<array><data>' 269 '<value><int>1</int></value><value><int>2</int></value>' 270 '</data></array>', [1, 2]) 271 check('<struct>' 272 '<member><name>b</name><value><int>2</int></value></member>' 273 '<member><name>a</name><value><int>1</int></value></member>' 274 '</struct>', {'a': 1, 'b': 2}) 275 276 def test_load_extension_types(self): 277 check = self.check_loads 278 check('<nil/>', None) 279 check('<ex:nil/>', None) 280 check('<i1>205</i1>', 205) 281 check('<i2>20561</i2>', 20561) 282 check('<i8>9876543210</i8>', 9876543210) 283 check('<biginteger>98765432100123456789</biginteger>', 284 98765432100123456789) 285 check('<float>93.78125</float>', 93.78125) 286 check('<bigdecimal>9876543210.0123456789</bigdecimal>', 287 decimal.Decimal('9876543210.0123456789')) 288 289 def test_get_host_info(self): 290 # see bug #3613, this raised a TypeError 291 transp = xmlrpc.client.Transport() 292 self.assertEqual(transp.get_host_info("user@host.tld"), 293 ('host.tld', 294 [('Authorization', 'Basic dXNlcg==')], {})) 295 296 def test_ssl_presence(self): 297 try: 298 import ssl 299 except ImportError: 300 has_ssl = False 301 else: 302 has_ssl = True 303 try: 304 xmlrpc.client.ServerProxy('https://localhost:9999').bad_function() 305 except NotImplementedError: 306 self.assertFalse(has_ssl, "xmlrpc client's error with SSL support") 307 except OSError: 308 self.assertTrue(has_ssl) 309 310 @unittest.skipUnless(threading, "Threading required for this test.") 311 def test_keepalive_disconnect(self): 312 class RequestHandler(http.server.BaseHTTPRequestHandler): 313 protocol_version = "HTTP/1.1" 314 handled = False 315 316 def do_POST(self): 317 length = int(self.headers.get("Content-Length")) 318 self.rfile.read(length) 319 if self.handled: 320 self.close_connection = True 321 return 322 response = xmlrpclib.dumps((5,), methodresponse=True) 323 response = response.encode() 324 self.send_response(http.HTTPStatus.OK) 325 self.send_header("Content-Length", len(response)) 326 self.end_headers() 327 self.wfile.write(response) 328 self.handled = True 329 self.close_connection = False 330 331 def run_server(): 332 server.socket.settimeout(float(1)) # Don't hang if client fails 333 server.handle_request() # First request and attempt at second 334 server.handle_request() # Retried second request 335 336 server = http.server.HTTPServer((support.HOST, 0), RequestHandler) 337 self.addCleanup(server.server_close) 338 thread = threading.Thread(target=run_server) 339 thread.start() 340 self.addCleanup(thread.join) 341 url = "http://{}:{}/".format(*server.server_address) 342 with xmlrpclib.ServerProxy(url) as p: 343 self.assertEqual(p.method(), 5) 344 self.assertEqual(p.method(), 5) 345 346class HelperTestCase(unittest.TestCase): 347 def test_escape(self): 348 self.assertEqual(xmlrpclib.escape("a&b"), "a&b") 349 self.assertEqual(xmlrpclib.escape("a<b"), "a<b") 350 self.assertEqual(xmlrpclib.escape("a>b"), "a>b") 351 352class FaultTestCase(unittest.TestCase): 353 def test_repr(self): 354 f = xmlrpclib.Fault(42, 'Test Fault') 355 self.assertEqual(repr(f), "<Fault 42: 'Test Fault'>") 356 self.assertEqual(repr(f), str(f)) 357 358 def test_dump_fault(self): 359 f = xmlrpclib.Fault(42, 'Test Fault') 360 s = xmlrpclib.dumps((f,)) 361 (newf,), m = xmlrpclib.loads(s) 362 self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'}) 363 self.assertEqual(m, None) 364 365 s = xmlrpclib.Marshaller().dumps(f) 366 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s) 367 368 def test_dotted_attribute(self): 369 # this will raise AttributeError because code don't want us to use 370 # private methods 371 self.assertRaises(AttributeError, 372 xmlrpc.server.resolve_dotted_attribute, str, '__add') 373 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 374 375class DateTimeTestCase(unittest.TestCase): 376 def test_default(self): 377 with mock.patch('time.localtime') as localtime_mock: 378 time_struct = time.struct_time( 379 [2013, 7, 15, 0, 24, 49, 0, 196, 0]) 380 localtime_mock.return_value = time_struct 381 localtime = time.localtime() 382 t = xmlrpclib.DateTime() 383 self.assertEqual(str(t), 384 time.strftime("%Y%m%dT%H:%M:%S", localtime)) 385 386 def test_time(self): 387 d = 1181399930.036952 388 t = xmlrpclib.DateTime(d) 389 self.assertEqual(str(t), 390 time.strftime("%Y%m%dT%H:%M:%S", time.localtime(d))) 391 392 def test_time_tuple(self): 393 d = (2007,6,9,10,38,50,5,160,0) 394 t = xmlrpclib.DateTime(d) 395 self.assertEqual(str(t), '20070609T10:38:50') 396 397 def test_time_struct(self): 398 d = time.localtime(1181399930.036952) 399 t = xmlrpclib.DateTime(d) 400 self.assertEqual(str(t), time.strftime("%Y%m%dT%H:%M:%S", d)) 401 402 def test_datetime_datetime(self): 403 d = datetime.datetime(2007,1,2,3,4,5) 404 t = xmlrpclib.DateTime(d) 405 self.assertEqual(str(t), '20070102T03:04:05') 406 407 def test_repr(self): 408 d = datetime.datetime(2007,1,2,3,4,5) 409 t = xmlrpclib.DateTime(d) 410 val ="<DateTime '20070102T03:04:05' at %#x>" % id(t) 411 self.assertEqual(repr(t), val) 412 413 def test_decode(self): 414 d = ' 20070908T07:11:13 ' 415 t1 = xmlrpclib.DateTime() 416 t1.decode(d) 417 tref = xmlrpclib.DateTime(datetime.datetime(2007,9,8,7,11,13)) 418 self.assertEqual(t1, tref) 419 420 t2 = xmlrpclib._datetime(d) 421 self.assertEqual(t2, tref) 422 423 def test_comparison(self): 424 now = datetime.datetime.now() 425 dtime = xmlrpclib.DateTime(now.timetuple()) 426 427 # datetime vs. DateTime 428 self.assertTrue(dtime == now) 429 self.assertTrue(now == dtime) 430 then = now + datetime.timedelta(seconds=4) 431 self.assertTrue(then >= dtime) 432 self.assertTrue(dtime < then) 433 434 # str vs. DateTime 435 dstr = now.strftime("%Y%m%dT%H:%M:%S") 436 self.assertTrue(dtime == dstr) 437 self.assertTrue(dstr == dtime) 438 dtime_then = xmlrpclib.DateTime(then.timetuple()) 439 self.assertTrue(dtime_then >= dstr) 440 self.assertTrue(dstr < dtime_then) 441 442 # some other types 443 dbytes = dstr.encode('ascii') 444 dtuple = now.timetuple() 445 with self.assertRaises(TypeError): 446 dtime == 1970 447 with self.assertRaises(TypeError): 448 dtime != dbytes 449 with self.assertRaises(TypeError): 450 dtime == bytearray(dbytes) 451 with self.assertRaises(TypeError): 452 dtime != dtuple 453 with self.assertRaises(TypeError): 454 dtime < float(1970) 455 with self.assertRaises(TypeError): 456 dtime > dbytes 457 with self.assertRaises(TypeError): 458 dtime <= bytearray(dbytes) 459 with self.assertRaises(TypeError): 460 dtime >= dtuple 461 462class BinaryTestCase(unittest.TestCase): 463 464 # XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff" 465 # for now (i.e. interpreting the binary data as Latin-1-encoded 466 # text). But this feels very unsatisfactory. Perhaps we should 467 # only define repr(), and return r"Binary(b'\xff')" instead? 468 469 def test_default(self): 470 t = xmlrpclib.Binary() 471 self.assertEqual(str(t), '') 472 473 def test_string(self): 474 d = b'\x01\x02\x03abc123\xff\xfe' 475 t = xmlrpclib.Binary(d) 476 self.assertEqual(str(t), str(d, "latin-1")) 477 478 def test_decode(self): 479 d = b'\x01\x02\x03abc123\xff\xfe' 480 de = base64.encodebytes(d) 481 t1 = xmlrpclib.Binary() 482 t1.decode(de) 483 self.assertEqual(str(t1), str(d, "latin-1")) 484 485 t2 = xmlrpclib._binary(de) 486 self.assertEqual(str(t2), str(d, "latin-1")) 487 488 489ADDR = PORT = URL = None 490 491# The evt is set twice. First when the server is ready to serve. 492# Second when the server has been shutdown. The user must clear 493# the event after it has been set the first time to catch the second set. 494def http_server(evt, numrequests, requestHandler=None, encoding=None): 495 class TestInstanceClass: 496 def div(self, x, y): 497 return x // y 498 499 def _methodHelp(self, name): 500 if name == 'div': 501 return 'This is the div function' 502 503 class Fixture: 504 @staticmethod 505 def getData(): 506 return '42' 507 508 def my_function(): 509 '''This is my function''' 510 return True 511 512 class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer): 513 def get_request(self): 514 # Ensure the socket is always non-blocking. On Linux, socket 515 # attributes are not inherited like they are on *BSD and Windows. 516 s, port = self.socket.accept() 517 s.setblocking(True) 518 return s, port 519 520 if not requestHandler: 521 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 522 serv = MyXMLRPCServer(("localhost", 0), requestHandler, 523 encoding=encoding, 524 logRequests=False, bind_and_activate=False) 525 try: 526 serv.server_bind() 527 global ADDR, PORT, URL 528 ADDR, PORT = serv.socket.getsockname() 529 #connect to IP address directly. This avoids socket.create_connection() 530 #trying to connect to "localhost" using all address families, which 531 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 532 #on AF_INET only. 533 URL = "http://%s:%d"%(ADDR, PORT) 534 serv.server_activate() 535 serv.register_introspection_functions() 536 serv.register_multicall_functions() 537 serv.register_function(pow) 538 serv.register_function(lambda x,y: x+y, 'add') 539 serv.register_function(lambda x: x, 'têšt') 540 serv.register_function(my_function) 541 testInstance = TestInstanceClass() 542 serv.register_instance(testInstance, allow_dotted_names=True) 543 evt.set() 544 545 # handle up to 'numrequests' requests 546 while numrequests > 0: 547 serv.handle_request() 548 numrequests -= 1 549 550 except socket.timeout: 551 pass 552 finally: 553 serv.socket.close() 554 PORT = None 555 evt.set() 556 557def http_multi_server(evt, numrequests, requestHandler=None): 558 class TestInstanceClass: 559 def div(self, x, y): 560 return x // y 561 562 def _methodHelp(self, name): 563 if name == 'div': 564 return 'This is the div function' 565 566 def my_function(): 567 '''This is my function''' 568 return True 569 570 class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): 571 def get_request(self): 572 # Ensure the socket is always non-blocking. On Linux, socket 573 # attributes are not inherited like they are on *BSD and Windows. 574 s, port = self.socket.accept() 575 s.setblocking(True) 576 return s, port 577 578 if not requestHandler: 579 requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler 580 class MyRequestHandler(requestHandler): 581 rpc_paths = [] 582 583 class BrokenDispatcher: 584 def _marshaled_dispatch(self, data, dispatch_method=None, path=None): 585 raise RuntimeError("broken dispatcher") 586 587 serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, 588 logRequests=False, bind_and_activate=False) 589 serv.socket.settimeout(3) 590 serv.server_bind() 591 try: 592 global ADDR, PORT, URL 593 ADDR, PORT = serv.socket.getsockname() 594 #connect to IP address directly. This avoids socket.create_connection() 595 #trying to connect to "localhost" using all address families, which 596 #causes slowdown e.g. on vista which supports AF_INET6. The server listens 597 #on AF_INET only. 598 URL = "http://%s:%d"%(ADDR, PORT) 599 serv.server_activate() 600 paths = ["/foo", "/foo/bar"] 601 for path in paths: 602 d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) 603 d.register_introspection_functions() 604 d.register_multicall_functions() 605 serv.get_dispatcher(paths[0]).register_function(pow) 606 serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') 607 serv.add_dispatcher("/is/broken", BrokenDispatcher()) 608 evt.set() 609 610 # handle up to 'numrequests' requests 611 while numrequests > 0: 612 serv.handle_request() 613 numrequests -= 1 614 615 except socket.timeout: 616 pass 617 finally: 618 serv.socket.close() 619 PORT = None 620 evt.set() 621 622# This function prevents errors like: 623# <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error> 624def is_unavailable_exception(e): 625 '''Returns True if the given ProtocolError is the product of a server-side 626 exception caused by the 'temporarily unavailable' response sometimes 627 given by operations on non-blocking sockets.''' 628 629 # sometimes we get a -1 error code and/or empty headers 630 try: 631 if e.errcode == -1 or e.headers is None: 632 return True 633 exc_mess = e.headers.get('X-exception') 634 except AttributeError: 635 # Ignore OSErrors here. 636 exc_mess = str(e) 637 638 if exc_mess and 'temporarily unavailable' in exc_mess.lower(): 639 return True 640 641def make_request_and_skipIf(condition, reason): 642 # If we skip the test, we have to make a request because 643 # the server created in setUp blocks expecting one to come in. 644 if not condition: 645 return lambda func: func 646 def decorator(func): 647 def make_request_and_skip(self): 648 try: 649 xmlrpclib.ServerProxy(URL).my_function() 650 except (xmlrpclib.ProtocolError, OSError) as e: 651 if not is_unavailable_exception(e): 652 raise 653 raise unittest.SkipTest(reason) 654 return make_request_and_skip 655 return decorator 656 657@unittest.skipUnless(threading, 'Threading required for this test.') 658class BaseServerTestCase(unittest.TestCase): 659 requestHandler = None 660 request_count = 1 661 threadFunc = staticmethod(http_server) 662 663 def setUp(self): 664 # enable traceback reporting 665 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 666 667 self.evt = threading.Event() 668 # start server thread to handle requests 669 serv_args = (self.evt, self.request_count, self.requestHandler) 670 threading.Thread(target=self.threadFunc, args=serv_args).start() 671 672 # wait for the server to be ready 673 self.evt.wait() 674 self.evt.clear() 675 676 def tearDown(self): 677 # wait on the server thread to terminate 678 self.evt.wait() 679 680 # disable traceback reporting 681 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 682 683class SimpleServerTestCase(BaseServerTestCase): 684 def test_simple1(self): 685 try: 686 p = xmlrpclib.ServerProxy(URL) 687 self.assertEqual(p.pow(6,8), 6**8) 688 except (xmlrpclib.ProtocolError, OSError) as e: 689 # ignore failures due to non-blocking socket 'unavailable' errors 690 if not is_unavailable_exception(e): 691 # protocol error; provide additional information in test output 692 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 693 694 def test_nonascii(self): 695 start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t' 696 end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n' 697 try: 698 p = xmlrpclib.ServerProxy(URL) 699 self.assertEqual(p.add(start_string, end_string), 700 start_string + end_string) 701 except (xmlrpclib.ProtocolError, OSError) as e: 702 # ignore failures due to non-blocking socket 'unavailable' errors 703 if not is_unavailable_exception(e): 704 # protocol error; provide additional information in test output 705 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 706 707 def test_client_encoding(self): 708 start_string = '\u20ac' 709 end_string = '\xa4' 710 711 try: 712 p = xmlrpclib.ServerProxy(URL, encoding='iso-8859-15') 713 self.assertEqual(p.add(start_string, end_string), 714 start_string + end_string) 715 except (xmlrpclib.ProtocolError, socket.error) as e: 716 # ignore failures due to non-blocking socket unavailable errors. 717 if not is_unavailable_exception(e): 718 # protocol error; provide additional information in test output 719 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 720 721 def test_nonascii_methodname(self): 722 try: 723 p = xmlrpclib.ServerProxy(URL, encoding='ascii') 724 self.assertEqual(p.têšt(42), 42) 725 except (xmlrpclib.ProtocolError, socket.error) as e: 726 # ignore failures due to non-blocking socket unavailable errors. 727 if not is_unavailable_exception(e): 728 # protocol error; provide additional information in test output 729 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 730 731 # [ch] The test 404 is causing lots of false alarms. 732 def XXXtest_404(self): 733 # send POST with http.client, it should return 404 header and 734 # 'Not Found' message. 735 conn = httplib.client.HTTPConnection(ADDR, PORT) 736 conn.request('POST', '/this-is-not-valid') 737 response = conn.getresponse() 738 conn.close() 739 740 self.assertEqual(response.status, 404) 741 self.assertEqual(response.reason, 'Not Found') 742 743 def test_introspection1(self): 744 expected_methods = set(['pow', 'div', 'my_function', 'add', 'têšt', 745 'system.listMethods', 'system.methodHelp', 746 'system.methodSignature', 'system.multicall', 747 'Fixture']) 748 try: 749 p = xmlrpclib.ServerProxy(URL) 750 meth = p.system.listMethods() 751 self.assertEqual(set(meth), expected_methods) 752 except (xmlrpclib.ProtocolError, OSError) as e: 753 # ignore failures due to non-blocking socket 'unavailable' errors 754 if not is_unavailable_exception(e): 755 # protocol error; provide additional information in test output 756 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 757 758 759 def test_introspection2(self): 760 try: 761 # test _methodHelp() 762 p = xmlrpclib.ServerProxy(URL) 763 divhelp = p.system.methodHelp('div') 764 self.assertEqual(divhelp, 'This is the div function') 765 except (xmlrpclib.ProtocolError, OSError) as e: 766 # ignore failures due to non-blocking socket 'unavailable' errors 767 if not is_unavailable_exception(e): 768 # protocol error; provide additional information in test output 769 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 770 771 @make_request_and_skipIf(sys.flags.optimize >= 2, 772 "Docstrings are omitted with -O2 and above") 773 def test_introspection3(self): 774 try: 775 # test native doc 776 p = xmlrpclib.ServerProxy(URL) 777 myfunction = p.system.methodHelp('my_function') 778 self.assertEqual(myfunction, 'This is my function') 779 except (xmlrpclib.ProtocolError, OSError) as e: 780 # ignore failures due to non-blocking socket 'unavailable' errors 781 if not is_unavailable_exception(e): 782 # protocol error; provide additional information in test output 783 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 784 785 def test_introspection4(self): 786 # the SimpleXMLRPCServer doesn't support signatures, but 787 # at least check that we can try making the call 788 try: 789 p = xmlrpclib.ServerProxy(URL) 790 divsig = p.system.methodSignature('div') 791 self.assertEqual(divsig, 'signatures not supported') 792 except (xmlrpclib.ProtocolError, OSError) as e: 793 # ignore failures due to non-blocking socket 'unavailable' errors 794 if not is_unavailable_exception(e): 795 # protocol error; provide additional information in test output 796 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 797 798 def test_multicall(self): 799 try: 800 p = xmlrpclib.ServerProxy(URL) 801 multicall = xmlrpclib.MultiCall(p) 802 multicall.add(2,3) 803 multicall.pow(6,8) 804 multicall.div(127,42) 805 add_result, pow_result, div_result = multicall() 806 self.assertEqual(add_result, 2+3) 807 self.assertEqual(pow_result, 6**8) 808 self.assertEqual(div_result, 127//42) 809 except (xmlrpclib.ProtocolError, OSError) as e: 810 # ignore failures due to non-blocking socket 'unavailable' errors 811 if not is_unavailable_exception(e): 812 # protocol error; provide additional information in test output 813 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 814 815 def test_non_existing_multicall(self): 816 try: 817 p = xmlrpclib.ServerProxy(URL) 818 multicall = xmlrpclib.MultiCall(p) 819 multicall.this_is_not_exists() 820 result = multicall() 821 822 # result.results contains; 823 # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:' 824 # 'method "this_is_not_exists" is not supported'>}] 825 826 self.assertEqual(result.results[0]['faultCode'], 1) 827 self.assertEqual(result.results[0]['faultString'], 828 '<class \'Exception\'>:method "this_is_not_exists" ' 829 'is not supported') 830 except (xmlrpclib.ProtocolError, OSError) as e: 831 # ignore failures due to non-blocking socket 'unavailable' errors 832 if not is_unavailable_exception(e): 833 # protocol error; provide additional information in test output 834 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 835 836 def test_dotted_attribute(self): 837 # Raises an AttributeError because private methods are not allowed. 838 self.assertRaises(AttributeError, 839 xmlrpc.server.resolve_dotted_attribute, str, '__add') 840 841 self.assertTrue(xmlrpc.server.resolve_dotted_attribute(str, 'title')) 842 # Get the test to run faster by sending a request with test_simple1. 843 # This avoids waiting for the socket timeout. 844 self.test_simple1() 845 846 def test_allow_dotted_names_true(self): 847 # XXX also need allow_dotted_names_false test. 848 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 849 data = server.Fixture.getData() 850 self.assertEqual(data, '42') 851 852 def test_unicode_host(self): 853 server = xmlrpclib.ServerProxy("http://%s:%d/RPC2" % (ADDR, PORT)) 854 self.assertEqual(server.add("a", "\xe9"), "a\xe9") 855 856 def test_partial_post(self): 857 # Check that a partial POST doesn't make the server loop: issue #14001. 858 conn = http.client.HTTPConnection(ADDR, PORT) 859 conn.request('POST', '/RPC2 HTTP/1.0\r\nContent-Length: 100\r\n\r\nbye') 860 conn.close() 861 862 def test_context_manager(self): 863 with xmlrpclib.ServerProxy(URL) as server: 864 server.add(2, 3) 865 self.assertNotEqual(server('transport')._connection, 866 (None, None)) 867 self.assertEqual(server('transport')._connection, 868 (None, None)) 869 870 def test_context_manager_method_error(self): 871 try: 872 with xmlrpclib.ServerProxy(URL) as server: 873 server.add(2, "a") 874 except xmlrpclib.Fault: 875 pass 876 self.assertEqual(server('transport')._connection, 877 (None, None)) 878 879 880class SimpleServerEncodingTestCase(BaseServerTestCase): 881 @staticmethod 882 def threadFunc(evt, numrequests, requestHandler=None, encoding=None): 883 http_server(evt, numrequests, requestHandler, 'iso-8859-15') 884 885 def test_server_encoding(self): 886 start_string = '\u20ac' 887 end_string = '\xa4' 888 889 try: 890 p = xmlrpclib.ServerProxy(URL) 891 self.assertEqual(p.add(start_string, end_string), 892 start_string + end_string) 893 except (xmlrpclib.ProtocolError, socket.error) as e: 894 # ignore failures due to non-blocking socket unavailable errors. 895 if not is_unavailable_exception(e): 896 # protocol error; provide additional information in test output 897 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 898 899 900class MultiPathServerTestCase(BaseServerTestCase): 901 threadFunc = staticmethod(http_multi_server) 902 request_count = 2 903 def test_path1(self): 904 p = xmlrpclib.ServerProxy(URL+"/foo") 905 self.assertEqual(p.pow(6,8), 6**8) 906 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 907 908 def test_path2(self): 909 p = xmlrpclib.ServerProxy(URL+"/foo/bar") 910 self.assertEqual(p.add(6,8), 6+8) 911 self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) 912 913 def test_path3(self): 914 p = xmlrpclib.ServerProxy(URL+"/is/broken") 915 self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) 916 917#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 918#does indeed serve subsequent requests on the same connection 919class BaseKeepaliveServerTestCase(BaseServerTestCase): 920 #a request handler that supports keep-alive and logs requests into a 921 #class variable 922 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 923 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 924 protocol_version = 'HTTP/1.1' 925 myRequests = [] 926 def handle(self): 927 self.myRequests.append([]) 928 self.reqidx = len(self.myRequests)-1 929 return self.parentClass.handle(self) 930 def handle_one_request(self): 931 result = self.parentClass.handle_one_request(self) 932 self.myRequests[self.reqidx].append(self.raw_requestline) 933 return result 934 935 requestHandler = RequestHandler 936 def setUp(self): 937 #clear request log 938 self.RequestHandler.myRequests = [] 939 return BaseServerTestCase.setUp(self) 940 941#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism 942#does indeed serve subsequent requests on the same connection 943class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): 944 def test_two(self): 945 p = xmlrpclib.ServerProxy(URL) 946 #do three requests. 947 self.assertEqual(p.pow(6,8), 6**8) 948 self.assertEqual(p.pow(6,8), 6**8) 949 self.assertEqual(p.pow(6,8), 6**8) 950 p("close")() 951 952 #they should have all been handled by a single request handler 953 self.assertEqual(len(self.RequestHandler.myRequests), 1) 954 955 #check that we did at least two (the third may be pending append 956 #due to thread scheduling) 957 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 958 959 960#test special attribute access on the serverproxy, through the __call__ 961#function. 962class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): 963 #ask for two keepalive requests to be handled. 964 request_count=2 965 966 def test_close(self): 967 p = xmlrpclib.ServerProxy(URL) 968 #do some requests with close. 969 self.assertEqual(p.pow(6,8), 6**8) 970 self.assertEqual(p.pow(6,8), 6**8) 971 self.assertEqual(p.pow(6,8), 6**8) 972 p("close")() #this should trigger a new keep-alive request 973 self.assertEqual(p.pow(6,8), 6**8) 974 self.assertEqual(p.pow(6,8), 6**8) 975 self.assertEqual(p.pow(6,8), 6**8) 976 p("close")() 977 978 #they should have all been two request handlers, each having logged at least 979 #two complete requests 980 self.assertEqual(len(self.RequestHandler.myRequests), 2) 981 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) 982 self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) 983 984 985 def test_transport(self): 986 p = xmlrpclib.ServerProxy(URL) 987 #do some requests with close. 988 self.assertEqual(p.pow(6,8), 6**8) 989 p("transport").close() #same as above, really. 990 self.assertEqual(p.pow(6,8), 6**8) 991 p("close")() 992 self.assertEqual(len(self.RequestHandler.myRequests), 2) 993 994#A test case that verifies that gzip encoding works in both directions 995#(for a request and the response) 996@unittest.skipIf(gzip is None, 'requires gzip') 997class GzipServerTestCase(BaseServerTestCase): 998 #a request handler that supports keep-alive and logs requests into a 999 #class variable 1000 class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): 1001 parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler 1002 protocol_version = 'HTTP/1.1' 1003 1004 def do_POST(self): 1005 #store content of last request in class 1006 self.__class__.content_length = int(self.headers["content-length"]) 1007 return self.parentClass.do_POST(self) 1008 requestHandler = RequestHandler 1009 1010 class Transport(xmlrpclib.Transport): 1011 #custom transport, stores the response length for our perusal 1012 fake_gzip = False 1013 def parse_response(self, response): 1014 self.response_length=int(response.getheader("content-length", 0)) 1015 return xmlrpclib.Transport.parse_response(self, response) 1016 1017 def send_content(self, connection, body): 1018 if self.fake_gzip: 1019 #add a lone gzip header to induce decode error remotely 1020 connection.putheader("Content-Encoding", "gzip") 1021 return xmlrpclib.Transport.send_content(self, connection, body) 1022 1023 def setUp(self): 1024 BaseServerTestCase.setUp(self) 1025 1026 def test_gzip_request(self): 1027 t = self.Transport() 1028 t.encode_threshold = None 1029 p = xmlrpclib.ServerProxy(URL, transport=t) 1030 self.assertEqual(p.pow(6,8), 6**8) 1031 a = self.RequestHandler.content_length 1032 t.encode_threshold = 0 #turn on request encoding 1033 self.assertEqual(p.pow(6,8), 6**8) 1034 b = self.RequestHandler.content_length 1035 self.assertTrue(a>b) 1036 p("close")() 1037 1038 def test_bad_gzip_request(self): 1039 t = self.Transport() 1040 t.encode_threshold = None 1041 t.fake_gzip = True 1042 p = xmlrpclib.ServerProxy(URL, transport=t) 1043 cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, 1044 re.compile(r"\b400\b")) 1045 with cm: 1046 p.pow(6, 8) 1047 p("close")() 1048 1049 def test_gzip_response(self): 1050 t = self.Transport() 1051 p = xmlrpclib.ServerProxy(URL, transport=t) 1052 old = self.requestHandler.encode_threshold 1053 self.requestHandler.encode_threshold = None #no encoding 1054 self.assertEqual(p.pow(6,8), 6**8) 1055 a = t.response_length 1056 self.requestHandler.encode_threshold = 0 #always encode 1057 self.assertEqual(p.pow(6,8), 6**8) 1058 p("close")() 1059 b = t.response_length 1060 self.requestHandler.encode_threshold = old 1061 self.assertTrue(a>b) 1062 1063 1064@unittest.skipIf(gzip is None, 'requires gzip') 1065class GzipUtilTestCase(unittest.TestCase): 1066 1067 def test_gzip_decode_limit(self): 1068 max_gzip_decode = 20 * 1024 * 1024 1069 data = b'\0' * max_gzip_decode 1070 encoded = xmlrpclib.gzip_encode(data) 1071 decoded = xmlrpclib.gzip_decode(encoded) 1072 self.assertEqual(len(decoded), max_gzip_decode) 1073 1074 data = b'\0' * (max_gzip_decode + 1) 1075 encoded = xmlrpclib.gzip_encode(data) 1076 1077 with self.assertRaisesRegex(ValueError, 1078 "max gzipped payload length exceeded"): 1079 xmlrpclib.gzip_decode(encoded) 1080 1081 xmlrpclib.gzip_decode(encoded, max_decode=-1) 1082 1083 1084#Test special attributes of the ServerProxy object 1085class ServerProxyTestCase(unittest.TestCase): 1086 def setUp(self): 1087 unittest.TestCase.setUp(self) 1088 if threading: 1089 self.url = URL 1090 else: 1091 # Without threading, http_server() and http_multi_server() will not 1092 # be executed and URL is still equal to None. 'http://' is a just 1093 # enough to choose the scheme (HTTP) 1094 self.url = 'http://' 1095 1096 def test_close(self): 1097 p = xmlrpclib.ServerProxy(self.url) 1098 self.assertEqual(p('close')(), None) 1099 1100 def test_transport(self): 1101 t = xmlrpclib.Transport() 1102 p = xmlrpclib.ServerProxy(self.url, transport=t) 1103 self.assertEqual(p('transport'), t) 1104 1105 1106# This is a contrived way to make a failure occur on the server side 1107# in order to test the _send_traceback_header flag on the server 1108class FailingMessageClass(http.client.HTTPMessage): 1109 def get(self, key, failobj=None): 1110 key = key.lower() 1111 if key == 'content-length': 1112 return 'I am broken' 1113 return super().get(key, failobj) 1114 1115 1116@unittest.skipUnless(threading, 'Threading required for this test.') 1117class FailingServerTestCase(unittest.TestCase): 1118 def setUp(self): 1119 self.evt = threading.Event() 1120 # start server thread to handle requests 1121 serv_args = (self.evt, 1) 1122 threading.Thread(target=http_server, args=serv_args).start() 1123 1124 # wait for the server to be ready 1125 self.evt.wait() 1126 self.evt.clear() 1127 1128 def tearDown(self): 1129 # wait on the server thread to terminate 1130 self.evt.wait() 1131 # reset flag 1132 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False 1133 # reset message class 1134 default_class = http.client.HTTPMessage 1135 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = default_class 1136 1137 def test_basic(self): 1138 # check that flag is false by default 1139 flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header 1140 self.assertEqual(flagval, False) 1141 1142 # enable traceback reporting 1143 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1144 1145 # test a call that shouldn't fail just as a smoke test 1146 try: 1147 p = xmlrpclib.ServerProxy(URL) 1148 self.assertEqual(p.pow(6,8), 6**8) 1149 except (xmlrpclib.ProtocolError, OSError) as e: 1150 # ignore failures due to non-blocking socket 'unavailable' errors 1151 if not is_unavailable_exception(e): 1152 # protocol error; provide additional information in test output 1153 self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) 1154 1155 def test_fail_no_info(self): 1156 # use the broken message class 1157 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1158 1159 try: 1160 p = xmlrpclib.ServerProxy(URL) 1161 p.pow(6,8) 1162 except (xmlrpclib.ProtocolError, OSError) as e: 1163 # ignore failures due to non-blocking socket 'unavailable' errors 1164 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1165 # The two server-side error headers shouldn't be sent back in this case 1166 self.assertTrue(e.headers.get("X-exception") is None) 1167 self.assertTrue(e.headers.get("X-traceback") is None) 1168 else: 1169 self.fail('ProtocolError not raised') 1170 1171 def test_fail_with_info(self): 1172 # use the broken message class 1173 xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass 1174 1175 # Check that errors in the server send back exception/traceback 1176 # info when flag is set 1177 xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True 1178 1179 try: 1180 p = xmlrpclib.ServerProxy(URL) 1181 p.pow(6,8) 1182 except (xmlrpclib.ProtocolError, OSError) as e: 1183 # ignore failures due to non-blocking socket 'unavailable' errors 1184 if not is_unavailable_exception(e) and hasattr(e, "headers"): 1185 # We should get error info in the response 1186 expected_err = "invalid literal for int() with base 10: 'I am broken'" 1187 self.assertEqual(e.headers.get("X-exception"), expected_err) 1188 self.assertTrue(e.headers.get("X-traceback") is not None) 1189 else: 1190 self.fail('ProtocolError not raised') 1191 1192 1193@contextlib.contextmanager 1194def captured_stdout(encoding='utf-8'): 1195 """A variation on support.captured_stdout() which gives a text stream 1196 having a `buffer` attribute. 1197 """ 1198 orig_stdout = sys.stdout 1199 sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) 1200 try: 1201 yield sys.stdout 1202 finally: 1203 sys.stdout = orig_stdout 1204 1205 1206class CGIHandlerTestCase(unittest.TestCase): 1207 def setUp(self): 1208 self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() 1209 1210 def tearDown(self): 1211 self.cgi = None 1212 1213 def test_cgi_get(self): 1214 with support.EnvironmentVarGuard() as env: 1215 env['REQUEST_METHOD'] = 'GET' 1216 # if the method is GET and no request_text is given, it runs handle_get 1217 # get sysout output 1218 with captured_stdout(encoding=self.cgi.encoding) as data_out: 1219 self.cgi.handle_request() 1220 1221 # parse Status header 1222 data_out.seek(0) 1223 handle = data_out.read() 1224 status = handle.split()[1] 1225 message = ' '.join(handle.split()[2:4]) 1226 1227 self.assertEqual(status, '400') 1228 self.assertEqual(message, 'Bad Request') 1229 1230 1231 def test_cgi_xmlrpc_response(self): 1232 data = """<?xml version='1.0'?> 1233 <methodCall> 1234 <methodName>test_method</methodName> 1235 <params> 1236 <param> 1237 <value><string>foo</string></value> 1238 </param> 1239 <param> 1240 <value><string>bar</string></value> 1241 </param> 1242 </params> 1243 </methodCall> 1244 """ 1245 1246 with support.EnvironmentVarGuard() as env, \ 1247 captured_stdout(encoding=self.cgi.encoding) as data_out, \ 1248 support.captured_stdin() as data_in: 1249 data_in.write(data) 1250 data_in.seek(0) 1251 env['CONTENT_LENGTH'] = str(len(data)) 1252 self.cgi.handle_request() 1253 data_out.seek(0) 1254 1255 # will respond exception, if so, our goal is achieved ;) 1256 handle = data_out.read() 1257 1258 # start with 44th char so as not to get http header, we just 1259 # need only xml 1260 self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:]) 1261 1262 # Also test the content-length returned by handle_request 1263 # Using the same test method inorder to avoid all the datapassing 1264 # boilerplate code. 1265 # Test for bug: http://bugs.python.org/issue5040 1266 1267 content = handle[handle.find("<?xml"):] 1268 1269 self.assertEqual( 1270 int(re.search(r'Content-Length: (\d+)', handle).group(1)), 1271 len(content)) 1272 1273 1274class UseBuiltinTypesTestCase(unittest.TestCase): 1275 1276 def test_use_builtin_types(self): 1277 # SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which 1278 # makes all dispatch of binary data as bytes instances, and all 1279 # dispatch of datetime argument as datetime.datetime instances. 1280 self.log = [] 1281 expected_bytes = b"my dog has fleas" 1282 expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12) 1283 marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar') 1284 def foobar(*args): 1285 self.log.extend(args) 1286 handler = xmlrpc.server.SimpleXMLRPCDispatcher( 1287 allow_none=True, encoding=None, use_builtin_types=True) 1288 handler.register_function(foobar) 1289 handler._marshaled_dispatch(marshaled) 1290 self.assertEqual(len(self.log), 2) 1291 mybytes, mydate = self.log 1292 self.assertEqual(self.log, [expected_bytes, expected_date]) 1293 self.assertIs(type(mydate), datetime.datetime) 1294 self.assertIs(type(mybytes), bytes) 1295 1296 def test_cgihandler_has_use_builtin_types_flag(self): 1297 handler = xmlrpc.server.CGIXMLRPCRequestHandler(use_builtin_types=True) 1298 self.assertTrue(handler.use_builtin_types) 1299 1300 def test_xmlrpcserver_has_use_builtin_types_flag(self): 1301 server = xmlrpc.server.SimpleXMLRPCServer(("localhost", 0), 1302 use_builtin_types=True) 1303 server.server_close() 1304 self.assertTrue(server.use_builtin_types) 1305 1306 1307@support.reap_threads 1308def test_main(): 1309 support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase, 1310 BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase, 1311 SimpleServerTestCase, SimpleServerEncodingTestCase, 1312 KeepaliveServerTestCase1, KeepaliveServerTestCase2, 1313 GzipServerTestCase, GzipUtilTestCase, 1314 MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase, 1315 CGIHandlerTestCase) 1316 1317 1318if __name__ == "__main__": 1319 test_main() 1320