1#!/usr/bin/env python 2 3# Authors: 4# Trevor Perrin 5# Kees Bos - Added tests for XML-RPC 6# Dimitris Moraitis - Anon ciphersuites 7# Marcelo Fernandez - Added test for NPN 8# Martin von Loewis - python 3 port 9 10# 11# See the LICENSE file for legal information regarding use of this file. 12from __future__ import print_function 13import sys 14import os 15import os.path 16import socket 17import time 18import getopt 19try: 20 from BaseHTTPServer import HTTPServer 21 from SimpleHTTPServer import SimpleHTTPRequestHandler 22except ImportError: 23 from http.server import HTTPServer, SimpleHTTPRequestHandler 24 25from tlslite import TLSConnection, Fault, HandshakeSettings, \ 26 X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \ 27 parsePEMKey, constants, \ 28 AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \ 29 POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \ 30 Checker, __version__ 31 32from tlslite.errors import * 33from tlslite.utils.cryptomath import prngName 34try: 35 import xmlrpclib 36except ImportError: 37 # Python 3 38 from xmlrpc import client as xmlrpclib 39from tlslite import * 40 41try: 42 from tack.structures.Tack import Tack 43 44except ImportError: 45 pass 46 47def printUsage(s=None): 48 if m2cryptoLoaded: 49 crypto = "M2Crypto/OpenSSL" 50 else: 51 crypto = "Python crypto" 52 if s: 53 print("ERROR: %s" % s) 54 print("""\ntls.py version %s (using %s) 55 56Commands: 57 server HOST:PORT DIRECTORY 58 59 client HOST:PORT DIRECTORY 60""" % (__version__, crypto)) 61 sys.exit(-1) 62 63 64def testConnClient(conn): 65 b1 = os.urandom(1) 66 b10 = os.urandom(10) 67 b100 = os.urandom(100) 68 b1000 = os.urandom(1000) 69 conn.write(b1) 70 conn.write(b10) 71 conn.write(b100) 72 conn.write(b1000) 73 assert(conn.read(min=1, max=1) == b1) 74 assert(conn.read(min=10, max=10) == b10) 75 assert(conn.read(min=100, max=100) == b100) 76 assert(conn.read(min=1000, max=1000) == b1000) 77 78def clientTestCmd(argv): 79 80 address = argv[0] 81 dir = argv[1] 82 83 #Split address into hostname/port tuple 84 address = address.split(":") 85 address = ( address[0], int(address[1]) ) 86 87 def connect(): 88 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 89 if hasattr(sock, 'settimeout'): #It's a python 2.3 feature 90 sock.settimeout(5) 91 sock.connect(address) 92 c = TLSConnection(sock) 93 return c 94 95 test = 0 96 97 badFault = False 98 99 print("Test 0 - anonymous handshake") 100 connection = connect() 101 connection.handshakeClientAnonymous() 102 testConnClient(connection) 103 connection.close() 104 105 print("Test 1 - good X509 (plus SNI)") 106 connection = connect() 107 connection.handshakeClientCert(serverName=address[0]) 108 testConnClient(connection) 109 assert(isinstance(connection.session.serverCertChain, X509CertChain)) 110 assert(connection.session.serverName == address[0]) 111 connection.close() 112 113 print("Test 1.a - good X509, SSLv3") 114 connection = connect() 115 settings = HandshakeSettings() 116 settings.minVersion = (3,0) 117 settings.maxVersion = (3,0) 118 connection.handshakeClientCert(settings=settings) 119 testConnClient(connection) 120 assert(isinstance(connection.session.serverCertChain, X509CertChain)) 121 connection.close() 122 123 print("Test 1.b - good X509, RC4-MD5") 124 connection = connect() 125 settings = HandshakeSettings() 126 settings.macNames = ["md5"] 127 connection.handshakeClientCert(settings=settings) 128 testConnClient(connection) 129 assert(isinstance(connection.session.serverCertChain, X509CertChain)) 130 assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5) 131 connection.close() 132 133 if tackpyLoaded: 134 135 settings = HandshakeSettings() 136 settings.useExperimentalTackExtension = True 137 138 print("Test 2.a - good X.509, TACK") 139 connection = connect() 140 connection.handshakeClientCert(settings=settings) 141 assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3") 142 assert(connection.session.tackExt.activation_flags == 1) 143 testConnClient(connection) 144 connection.close() 145 146 print("Test 2.b - good X.509, TACK unrelated to cert chain") 147 connection = connect() 148 try: 149 connection.handshakeClientCert(settings=settings) 150 assert(False) 151 except TLSLocalAlert as alert: 152 if alert.description != AlertDescription.illegal_parameter: 153 raise 154 connection.close() 155 156 print("Test 3 - good SRP") 157 connection = connect() 158 connection.handshakeClientSRP("test", "password") 159 testConnClient(connection) 160 connection.close() 161 162 print("Test 4 - SRP faults") 163 for fault in Fault.clientSrpFaults + Fault.genericFaults: 164 connection = connect() 165 connection.fault = fault 166 try: 167 connection.handshakeClientSRP("test", "password") 168 print(" Good Fault %s" % (Fault.faultNames[fault])) 169 except TLSFaultError as e: 170 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) 171 badFault = True 172 173 print("Test 6 - good SRP: with X.509 certificate, TLSv1.0") 174 settings = HandshakeSettings() 175 settings.minVersion = (3,1) 176 settings.maxVersion = (3,1) 177 connection = connect() 178 connection.handshakeClientSRP("test", "password", settings=settings) 179 assert(isinstance(connection.session.serverCertChain, X509CertChain)) 180 testConnClient(connection) 181 connection.close() 182 183 print("Test 7 - X.509 with SRP faults") 184 for fault in Fault.clientSrpFaults + Fault.genericFaults: 185 connection = connect() 186 connection.fault = fault 187 try: 188 connection.handshakeClientSRP("test", "password") 189 print(" Good Fault %s" % (Fault.faultNames[fault])) 190 except TLSFaultError as e: 191 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) 192 badFault = True 193 194 print("Test 11 - X.509 faults") 195 for fault in Fault.clientNoAuthFaults + Fault.genericFaults: 196 connection = connect() 197 connection.fault = fault 198 try: 199 connection.handshakeClientCert() 200 print(" Good Fault %s" % (Fault.faultNames[fault])) 201 except TLSFaultError as e: 202 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) 203 badFault = True 204 205 print("Test 14 - good mutual X509") 206 x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read()) 207 x509Chain = X509CertChain([x509Cert]) 208 s = open(os.path.join(dir, "clientX509Key.pem")).read() 209 x509Key = parsePEMKey(s, private=True) 210 211 connection = connect() 212 connection.handshakeClientCert(x509Chain, x509Key) 213 testConnClient(connection) 214 assert(isinstance(connection.session.serverCertChain, X509CertChain)) 215 connection.close() 216 217 print("Test 14.a - good mutual X509, SSLv3") 218 connection = connect() 219 settings = HandshakeSettings() 220 settings.minVersion = (3,0) 221 settings.maxVersion = (3,0) 222 connection.handshakeClientCert(x509Chain, x509Key, settings=settings) 223 testConnClient(connection) 224 assert(isinstance(connection.session.serverCertChain, X509CertChain)) 225 connection.close() 226 227 print("Test 15 - mutual X.509 faults") 228 for fault in Fault.clientCertFaults + Fault.genericFaults: 229 connection = connect() 230 connection.fault = fault 231 try: 232 connection.handshakeClientCert(x509Chain, x509Key) 233 print(" Good Fault %s" % (Fault.faultNames[fault])) 234 except TLSFaultError as e: 235 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) 236 badFault = True 237 238 print("Test 18 - good SRP, prepare to resume... (plus SNI)") 239 connection = connect() 240 connection.handshakeClientSRP("test", "password", serverName=address[0]) 241 testConnClient(connection) 242 connection.close() 243 session = connection.session 244 245 print("Test 19 - resumption (plus SNI)") 246 connection = connect() 247 connection.handshakeClientSRP("test", "garbage", serverName=address[0], 248 session=session) 249 testConnClient(connection) 250 #Don't close! -- see below 251 252 print("Test 20 - invalidated resumption (plus SNI)") 253 connection.sock.close() #Close the socket without a close_notify! 254 connection = connect() 255 try: 256 connection.handshakeClientSRP("test", "garbage", 257 serverName=address[0], session=session) 258 assert(False) 259 except TLSRemoteAlert as alert: 260 if alert.description != AlertDescription.bad_record_mac: 261 raise 262 connection.close() 263 264 print("Test 21 - HTTPS test X.509") 265 address = address[0], address[1]+1 266 if hasattr(socket, "timeout"): 267 timeoutEx = socket.timeout 268 else: 269 timeoutEx = socket.error 270 while 1: 271 try: 272 time.sleep(2) 273 htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8") 274 fingerprint = None 275 for y in range(2): 276 checker =Checker(x509Fingerprint=fingerprint) 277 h = HTTPTLSConnection(\ 278 address[0], address[1], checker=checker) 279 for x in range(3): 280 h.request("GET", "/index.html") 281 r = h.getresponse() 282 assert(r.status == 200) 283 b = bytearray(r.read()) 284 assert(b == htmlBody) 285 fingerprint = h.tlsSession.serverCertChain.getFingerprint() 286 assert(fingerprint) 287 time.sleep(2) 288 break 289 except timeoutEx: 290 print("timeout, retrying...") 291 pass 292 293 address = address[0], address[1]+1 294 295 implementations = [] 296 if m2cryptoLoaded: 297 implementations.append("openssl") 298 if pycryptoLoaded: 299 implementations.append("pycrypto") 300 implementations.append("python") 301 302 print("Test 22 - different ciphers, TLSv1.0") 303 for implementation in implementations: 304 for cipher in ["aes128", "aes256", "rc4"]: 305 306 print("Test 22:", end=' ') 307 connection = connect() 308 309 settings = HandshakeSettings() 310 settings.cipherNames = [cipher] 311 settings.cipherImplementations = [implementation, "python"] 312 settings.minVersion = (3,1) 313 settings.maxVersion = (3,1) 314 connection.handshakeClientCert(settings=settings) 315 testConnClient(connection) 316 print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation())) 317 connection.close() 318 319 print("Test 23 - throughput test") 320 for implementation in implementations: 321 for cipher in ["aes128", "aes256", "3des", "rc4"]: 322 if cipher == "3des" and implementation not in ("openssl", "pycrypto"): 323 continue 324 325 print("Test 23:", end=' ') 326 connection = connect() 327 328 settings = HandshakeSettings() 329 settings.cipherNames = [cipher] 330 settings.cipherImplementations = [implementation, "python"] 331 connection.handshakeClientCert(settings=settings) 332 print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ') 333 334 startTime = time.clock() 335 connection.write(b"hello"*10000) 336 h = connection.read(min=50000, max=50000) 337 stopTime = time.clock() 338 if stopTime-startTime: 339 print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime))) 340 else: 341 print("100K exchanged very fast") 342 343 assert(h == b"hello"*10000) 344 connection.close() 345 346 print("Test 24.a - Next-Protocol Client Negotiation") 347 connection = connect() 348 connection.handshakeClientCert(nextProtos=[b"http/1.1"]) 349 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) 350 assert(connection.next_proto == b'http/1.1') 351 connection.close() 352 353 print("Test 24.b - Next-Protocol Client Negotiation") 354 connection = connect() 355 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) 356 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) 357 assert(connection.next_proto == b'spdy/2') 358 connection.close() 359 360 print("Test 24.c - Next-Protocol Client Negotiation") 361 connection = connect() 362 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) 363 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) 364 assert(connection.next_proto == b'spdy/2') 365 connection.close() 366 367 print("Test 24.d - Next-Protocol Client Negotiation") 368 connection = connect() 369 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) 370 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) 371 assert(connection.next_proto == b'spdy/2') 372 connection.close() 373 374 print("Test 24.e - Next-Protocol Client Negotiation") 375 connection = connect() 376 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"]) 377 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) 378 assert(connection.next_proto == b'spdy/3') 379 connection.close() 380 381 print("Test 24.f - Next-Protocol Client Negotiation") 382 connection = connect() 383 connection.handshakeClientCert(nextProtos=[b"http/1.1"]) 384 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) 385 assert(connection.next_proto == b'http/1.1') 386 connection.close() 387 388 print("Test 24.g - Next-Protocol Client Negotiation") 389 connection = connect() 390 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"]) 391 #print(" Next-Protocol Negotiated: %s" % connection.next_proto) 392 assert(connection.next_proto == b'spdy/2') 393 connection.close() 394 395 print('Test 25 - good standard XMLRPC https client') 396 time.sleep(2) # Hack for lack of ability to set timeout here 397 address = address[0], address[1]+1 398 server = xmlrpclib.Server('https://%s:%s' % address) 399 assert server.add(1,2) == 3 400 assert server.pow(2,4) == 16 401 402 print('Test 26 - good tlslite XMLRPC client') 403 transport = XMLRPCTransport(ignoreAbruptClose=True) 404 server = xmlrpclib.Server('https://%s:%s' % address, transport) 405 assert server.add(1,2) == 3 406 assert server.pow(2,4) == 16 407 408 print('Test 27 - good XMLRPC ignored protocol') 409 server = xmlrpclib.Server('http://%s:%s' % address, transport) 410 assert server.add(1,2) == 3 411 assert server.pow(2,4) == 16 412 413 print("Test 28 - Internet servers test") 414 try: 415 i = IMAP4_TLS("cyrus.andrew.cmu.edu") 416 i.login("anonymous", "anonymous@anonymous.net") 417 i.logout() 418 print("Test 28: IMAP4 good") 419 p = POP3_TLS("pop.gmail.com") 420 p.quit() 421 print("Test 29: POP3 good") 422 except socket.error as e: 423 print("Non-critical error: socket error trying to reach internet server: ", e) 424 425 if not badFault: 426 print("Test succeeded") 427 else: 428 print("Test failed") 429 430 431 432def testConnServer(connection): 433 count = 0 434 while 1: 435 s = connection.read() 436 count += len(s) 437 if len(s) == 0: 438 break 439 connection.write(s) 440 if count == 1111: 441 break 442 443def serverTestCmd(argv): 444 445 address = argv[0] 446 dir = argv[1] 447 448 #Split address into hostname/port tuple 449 address = address.split(":") 450 address = ( address[0], int(address[1]) ) 451 452 #Connect to server 453 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 454 lsock.bind(address) 455 lsock.listen(5) 456 457 def connect(): 458 return TLSConnection(lsock.accept()[0]) 459 460 x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read()) 461 x509Chain = X509CertChain([x509Cert]) 462 s = open(os.path.join(dir, "serverX509Key.pem")).read() 463 x509Key = parsePEMKey(s, private=True) 464 465 print("Test 0 - Anonymous server handshake") 466 connection = connect() 467 connection.handshakeServer(anon=True) 468 testConnServer(connection) 469 connection.close() 470 471 print("Test 1 - good X.509") 472 connection = connect() 473 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) 474 assert(connection.session.serverName == address[0]) 475 testConnServer(connection) 476 connection.close() 477 478 print("Test 1.a - good X.509, SSL v3") 479 connection = connect() 480 settings = HandshakeSettings() 481 settings.minVersion = (3,0) 482 settings.maxVersion = (3,0) 483 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) 484 testConnServer(connection) 485 connection.close() 486 487 print("Test 1.b - good X.509, RC4-MD5") 488 connection = connect() 489 settings = HandshakeSettings() 490 settings.macNames = ["sha", "md5"] 491 settings.cipherNames = ["rc4"] 492 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) 493 testConnServer(connection) 494 connection.close() 495 496 if tackpyLoaded: 497 tack = Tack.createFromPem(open("./TACK1.pem", "rU").read()) 498 tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read()) 499 500 settings = HandshakeSettings() 501 settings.useExperimentalTackExtension = True 502 503 print("Test 2.a - good X.509, TACK") 504 connection = connect() 505 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 506 tacks=[tack], activationFlags=1, settings=settings) 507 testConnServer(connection) 508 connection.close() 509 510 print("Test 2.b - good X.509, TACK unrelated to cert chain") 511 connection = connect() 512 try: 513 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 514 tacks=[tackUnrelated], settings=settings) 515 assert(False) 516 except TLSRemoteAlert as alert: 517 if alert.description != AlertDescription.illegal_parameter: 518 raise 519 520 print("Test 3 - good SRP") 521 verifierDB = VerifierDB() 522 verifierDB.create() 523 entry = VerifierDB.makeVerifier("test", "password", 1536) 524 verifierDB["test"] = entry 525 526 connection = connect() 527 connection.handshakeServer(verifierDB=verifierDB) 528 testConnServer(connection) 529 connection.close() 530 531 print("Test 4 - SRP faults") 532 for fault in Fault.clientSrpFaults + Fault.genericFaults: 533 connection = connect() 534 connection.fault = fault 535 try: 536 connection.handshakeServer(verifierDB=verifierDB) 537 assert() 538 except: 539 pass 540 connection.close() 541 542 print("Test 6 - good SRP: with X.509 cert") 543 connection = connect() 544 connection.handshakeServer(verifierDB=verifierDB, \ 545 certChain=x509Chain, privateKey=x509Key) 546 testConnServer(connection) 547 connection.close() 548 549 print("Test 7 - X.509 with SRP faults") 550 for fault in Fault.clientSrpFaults + Fault.genericFaults: 551 connection = connect() 552 connection.fault = fault 553 try: 554 connection.handshakeServer(verifierDB=verifierDB, \ 555 certChain=x509Chain, privateKey=x509Key) 556 assert() 557 except: 558 pass 559 connection.close() 560 561 print("Test 11 - X.509 faults") 562 for fault in Fault.clientNoAuthFaults + Fault.genericFaults: 563 connection = connect() 564 connection.fault = fault 565 try: 566 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) 567 assert() 568 except: 569 pass 570 connection.close() 571 572 print("Test 14 - good mutual X.509") 573 connection = connect() 574 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) 575 testConnServer(connection) 576 assert(isinstance(connection.session.serverCertChain, X509CertChain)) 577 connection.close() 578 579 print("Test 14a - good mutual X.509, SSLv3") 580 connection = connect() 581 settings = HandshakeSettings() 582 settings.minVersion = (3,0) 583 settings.maxVersion = (3,0) 584 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) 585 testConnServer(connection) 586 assert(isinstance(connection.session.serverCertChain, X509CertChain)) 587 connection.close() 588 589 print("Test 15 - mutual X.509 faults") 590 for fault in Fault.clientCertFaults + Fault.genericFaults: 591 connection = connect() 592 connection.fault = fault 593 try: 594 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) 595 assert() 596 except: 597 pass 598 connection.close() 599 600 print("Test 18 - good SRP, prepare to resume") 601 sessionCache = SessionCache() 602 connection = connect() 603 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) 604 assert(connection.session.serverName == address[0]) 605 testConnServer(connection) 606 connection.close() 607 608 print("Test 19 - resumption") 609 connection = connect() 610 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) 611 assert(connection.session.serverName == address[0]) 612 testConnServer(connection) 613 #Don't close! -- see next test 614 615 print("Test 20 - invalidated resumption") 616 try: 617 connection.read(min=1, max=1) 618 assert() #Client is going to close the socket without a close_notify 619 except TLSAbruptCloseError as e: 620 pass 621 connection = connect() 622 try: 623 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) 624 except TLSLocalAlert as alert: 625 if alert.description != AlertDescription.bad_record_mac: 626 raise 627 connection.close() 628 629 print("Test 21 - HTTPS test X.509") 630 631 #Close the current listening socket 632 lsock.close() 633 634 #Create and run an HTTP Server using TLSSocketServerMixIn 635 class MyHTTPServer(TLSSocketServerMixIn, 636 HTTPServer): 637 def handshake(self, tlsConnection): 638 tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key) 639 return True 640 cd = os.getcwd() 641 os.chdir(dir) 642 address = address[0], address[1]+1 643 httpd = MyHTTPServer(address, SimpleHTTPRequestHandler) 644 for x in range(6): 645 httpd.handle_request() 646 httpd.server_close() 647 cd = os.chdir(cd) 648 649 #Re-connect the listening socket 650 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 651 address = address[0], address[1]+1 652 lsock.bind(address) 653 lsock.listen(5) 654 655 implementations = [] 656 if m2cryptoLoaded: 657 implementations.append("openssl") 658 if pycryptoLoaded: 659 implementations.append("pycrypto") 660 implementations.append("python") 661 662 print("Test 22 - different ciphers") 663 for implementation in ["python"] * len(implementations): 664 for cipher in ["aes128", "aes256", "rc4"]: 665 666 print("Test 22:", end=' ') 667 connection = connect() 668 669 settings = HandshakeSettings() 670 settings.cipherNames = [cipher] 671 settings.cipherImplementations = [implementation, "python"] 672 673 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 674 settings=settings) 675 print(connection.getCipherName(), connection.getCipherImplementation()) 676 testConnServer(connection) 677 connection.close() 678 679 print("Test 23 - throughput test") 680 for implementation in implementations: 681 for cipher in ["aes128", "aes256", "3des", "rc4"]: 682 if cipher == "3des" and implementation not in ("openssl", "pycrypto"): 683 continue 684 685 print("Test 23:", end=' ') 686 connection = connect() 687 688 settings = HandshakeSettings() 689 settings.cipherNames = [cipher] 690 settings.cipherImplementations = [implementation, "python"] 691 692 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 693 settings=settings) 694 print(connection.getCipherName(), connection.getCipherImplementation()) 695 h = connection.read(min=50000, max=50000) 696 assert(h == b"hello"*10000) 697 connection.write(h) 698 connection.close() 699 700 print("Test 24.a - Next-Protocol Server Negotiation") 701 connection = connect() 702 settings = HandshakeSettings() 703 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 704 settings=settings, nextProtos=[b"http/1.1"]) 705 testConnServer(connection) 706 connection.close() 707 708 print("Test 24.b - Next-Protocol Server Negotiation") 709 connection = connect() 710 settings = HandshakeSettings() 711 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 712 settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) 713 testConnServer(connection) 714 connection.close() 715 716 print("Test 24.c - Next-Protocol Server Negotiation") 717 connection = connect() 718 settings = HandshakeSettings() 719 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 720 settings=settings, nextProtos=[b"http/1.1", b"spdy/2"]) 721 testConnServer(connection) 722 connection.close() 723 724 print("Test 24.d - Next-Protocol Server Negotiation") 725 connection = connect() 726 settings = HandshakeSettings() 727 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 728 settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) 729 testConnServer(connection) 730 connection.close() 731 732 print("Test 24.e - Next-Protocol Server Negotiation") 733 connection = connect() 734 settings = HandshakeSettings() 735 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 736 settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"]) 737 testConnServer(connection) 738 connection.close() 739 740 print("Test 24.f - Next-Protocol Server Negotiation") 741 connection = connect() 742 settings = HandshakeSettings() 743 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 744 settings=settings, nextProtos=[b"spdy/3", b"spdy/2"]) 745 testConnServer(connection) 746 connection.close() 747 748 print("Test 24.g - Next-Protocol Server Negotiation") 749 connection = connect() 750 settings = HandshakeSettings() 751 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 752 settings=settings, nextProtos=[]) 753 testConnServer(connection) 754 connection.close() 755 756 print("Tests 25-27 - XMLRPXC server") 757 address = address[0], address[1]+1 758 class Server(TLSXMLRPCServer): 759 760 def handshake(self, tlsConnection): 761 try: 762 tlsConnection.handshakeServer(certChain=x509Chain, 763 privateKey=x509Key, 764 sessionCache=sessionCache) 765 tlsConnection.ignoreAbruptClose = True 766 return True 767 except TLSError as error: 768 print("Handshake failure:", str(error)) 769 return False 770 771 class MyFuncs: 772 def pow(self, x, y): return pow(x, y) 773 def add(self, x, y): return x + y 774 775 server = Server(address) 776 server.register_instance(MyFuncs()) 777 #sa = server.socket.getsockname() 778 #print "Serving HTTPS on", sa[0], "port", sa[1] 779 for i in range(6): 780 server.handle_request() 781 782 print("Test succeeded") 783 784 785if __name__ == '__main__': 786 if len(sys.argv) < 2: 787 printUsage("Missing command") 788 elif sys.argv[1] == "client"[:len(sys.argv[1])]: 789 clientTestCmd(sys.argv[2:]) 790 elif sys.argv[1] == "server"[:len(sys.argv[1])]: 791 serverTestCmd(sys.argv[2:]) 792 else: 793 printUsage("Unknown command: %s" % sys.argv[1]) 794