1#!/usr/bin/env python
2
3# Authors:
4#   Trevor Perrin
5#   Kees Bos - Added tests for XML-RPC
6#   Dimitris Moraitis - Anon ciphersuites
7#   Marcelo Fernandez - Added test for NPN
8#   Martin von Loewis - python 3 port
9
10#
11# See the LICENSE file for legal information regarding use of this file.
12from __future__ import print_function
13import sys
14import os
15import os.path
16import socket
17import time
18import getopt
19try:
20    from BaseHTTPServer import HTTPServer
21    from SimpleHTTPServer import SimpleHTTPRequestHandler
22except ImportError:
23    from http.server import HTTPServer, SimpleHTTPRequestHandler
24
25from tlslite import TLSConnection, Fault, HandshakeSettings, \
26    X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \
27    parsePEMKey, constants, \
28    AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \
29    POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \
30    Checker, __version__
31
32from tlslite.errors import *
33from tlslite.utils.cryptomath import prngName
34try:
35    import xmlrpclib
36except ImportError:
37    # Python 3
38    from xmlrpc import client as xmlrpclib
39from tlslite import *
40
41try:
42    from tack.structures.Tack import Tack
43
44except ImportError:
45    pass
46
47def printUsage(s=None):
48    if m2cryptoLoaded:
49        crypto = "M2Crypto/OpenSSL"
50    else:
51        crypto = "Python crypto"
52    if s:
53        print("ERROR: %s" % s)
54    print("""\ntls.py version %s (using %s)
55
56Commands:
57  server HOST:PORT DIRECTORY
58
59  client HOST:PORT DIRECTORY
60""" % (__version__, crypto))
61    sys.exit(-1)
62
63
64def testConnClient(conn):
65    b1 = os.urandom(1)
66    b10 = os.urandom(10)
67    b100 = os.urandom(100)
68    b1000 = os.urandom(1000)
69    conn.write(b1)
70    conn.write(b10)
71    conn.write(b100)
72    conn.write(b1000)
73    assert(conn.read(min=1, max=1) == b1)
74    assert(conn.read(min=10, max=10) == b10)
75    assert(conn.read(min=100, max=100) == b100)
76    assert(conn.read(min=1000, max=1000) == b1000)
77
78def clientTestCmd(argv):
79
80    address = argv[0]
81    dir = argv[1]
82
83    #Split address into hostname/port tuple
84    address = address.split(":")
85    address = ( address[0], int(address[1]) )
86
87    def connect():
88        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
89        if hasattr(sock, 'settimeout'): #It's a python 2.3 feature
90            sock.settimeout(5)
91        sock.connect(address)
92        c = TLSConnection(sock)
93        return c
94
95    test = 0
96
97    badFault = False
98
99    print("Test 0 - anonymous handshake")
100    connection = connect()
101    connection.handshakeClientAnonymous()
102    testConnClient(connection)
103    connection.close()
104
105    print("Test 1 - good X509 (plus SNI)")
106    connection = connect()
107    connection.handshakeClientCert(serverName=address[0])
108    testConnClient(connection)
109    assert(isinstance(connection.session.serverCertChain, X509CertChain))
110    assert(connection.session.serverName == address[0])
111    connection.close()
112
113    print("Test 1.a - good X509, SSLv3")
114    connection = connect()
115    settings = HandshakeSettings()
116    settings.minVersion = (3,0)
117    settings.maxVersion = (3,0)
118    connection.handshakeClientCert(settings=settings)
119    testConnClient(connection)
120    assert(isinstance(connection.session.serverCertChain, X509CertChain))
121    connection.close()
122
123    print("Test 1.b - good X509, RC4-MD5")
124    connection = connect()
125    settings = HandshakeSettings()
126    settings.macNames = ["md5"]
127    connection.handshakeClientCert(settings=settings)
128    testConnClient(connection)
129    assert(isinstance(connection.session.serverCertChain, X509CertChain))
130    assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5)
131    connection.close()
132
133    if tackpyLoaded:
134
135        settings = HandshakeSettings()
136        settings.useExperimentalTackExtension = True
137
138        print("Test 2.a - good X.509, TACK")
139        connection = connect()
140        connection.handshakeClientCert(settings=settings)
141        assert(connection.session.tackExt.tacks[0].getTackId() == "rrted.ptvtl.d2uiq.ox2xe.w4ss3")
142        assert(connection.session.tackExt.activation_flags == 1)
143        testConnClient(connection)
144        connection.close()
145
146        print("Test 2.b - good X.509, TACK unrelated to cert chain")
147        connection = connect()
148        try:
149            connection.handshakeClientCert(settings=settings)
150            assert(False)
151        except TLSLocalAlert as alert:
152            if alert.description != AlertDescription.illegal_parameter:
153                raise
154        connection.close()
155
156    print("Test 3 - good SRP")
157    connection = connect()
158    connection.handshakeClientSRP("test", "password")
159    testConnClient(connection)
160    connection.close()
161
162    print("Test 4 - SRP faults")
163    for fault in Fault.clientSrpFaults + Fault.genericFaults:
164        connection = connect()
165        connection.fault = fault
166        try:
167            connection.handshakeClientSRP("test", "password")
168            print("  Good Fault %s" % (Fault.faultNames[fault]))
169        except TLSFaultError as e:
170            print("  BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
171            badFault = True
172
173    print("Test 6 - good SRP: with X.509 certificate, TLSv1.0")
174    settings = HandshakeSettings()
175    settings.minVersion = (3,1)
176    settings.maxVersion = (3,1)
177    connection = connect()
178    connection.handshakeClientSRP("test", "password", settings=settings)
179    assert(isinstance(connection.session.serverCertChain, X509CertChain))
180    testConnClient(connection)
181    connection.close()
182
183    print("Test 7 - X.509 with SRP faults")
184    for fault in Fault.clientSrpFaults + Fault.genericFaults:
185        connection = connect()
186        connection.fault = fault
187        try:
188            connection.handshakeClientSRP("test", "password")
189            print("  Good Fault %s" % (Fault.faultNames[fault]))
190        except TLSFaultError as e:
191            print("  BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
192            badFault = True
193
194    print("Test 11 - X.509 faults")
195    for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
196        connection = connect()
197        connection.fault = fault
198        try:
199            connection.handshakeClientCert()
200            print("  Good Fault %s" % (Fault.faultNames[fault]))
201        except TLSFaultError as e:
202            print("  BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
203            badFault = True
204
205    print("Test 14 - good mutual X509")
206    x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
207    x509Chain = X509CertChain([x509Cert])
208    s = open(os.path.join(dir, "clientX509Key.pem")).read()
209    x509Key = parsePEMKey(s, private=True)
210
211    connection = connect()
212    connection.handshakeClientCert(x509Chain, x509Key)
213    testConnClient(connection)
214    assert(isinstance(connection.session.serverCertChain, X509CertChain))
215    connection.close()
216
217    print("Test 14.a - good mutual X509, SSLv3")
218    connection = connect()
219    settings = HandshakeSettings()
220    settings.minVersion = (3,0)
221    settings.maxVersion = (3,0)
222    connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
223    testConnClient(connection)
224    assert(isinstance(connection.session.serverCertChain, X509CertChain))
225    connection.close()
226
227    print("Test 15 - mutual X.509 faults")
228    for fault in Fault.clientCertFaults + Fault.genericFaults:
229        connection = connect()
230        connection.fault = fault
231        try:
232            connection.handshakeClientCert(x509Chain, x509Key)
233            print("  Good Fault %s" % (Fault.faultNames[fault]))
234        except TLSFaultError as e:
235            print("  BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
236            badFault = True
237
238    print("Test 18 - good SRP, prepare to resume... (plus SNI)")
239    connection = connect()
240    connection.handshakeClientSRP("test", "password", serverName=address[0])
241    testConnClient(connection)
242    connection.close()
243    session = connection.session
244
245    print("Test 19 - resumption (plus SNI)")
246    connection = connect()
247    connection.handshakeClientSRP("test", "garbage", serverName=address[0],
248                                    session=session)
249    testConnClient(connection)
250    #Don't close! -- see below
251
252    print("Test 20 - invalidated resumption (plus SNI)")
253    connection.sock.close() #Close the socket without a close_notify!
254    connection = connect()
255    try:
256        connection.handshakeClientSRP("test", "garbage",
257                        serverName=address[0], session=session)
258        assert(False)
259    except TLSRemoteAlert as alert:
260        if alert.description != AlertDescription.bad_record_mac:
261            raise
262    connection.close()
263
264    print("Test 21 - HTTPS test X.509")
265    address = address[0], address[1]+1
266    if hasattr(socket, "timeout"):
267        timeoutEx = socket.timeout
268    else:
269        timeoutEx = socket.error
270    while 1:
271        try:
272            time.sleep(2)
273            htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8")
274            fingerprint = None
275            for y in range(2):
276                checker =Checker(x509Fingerprint=fingerprint)
277                h = HTTPTLSConnection(\
278                        address[0], address[1], checker=checker)
279                for x in range(3):
280                    h.request("GET", "/index.html")
281                    r = h.getresponse()
282                    assert(r.status == 200)
283                    b = bytearray(r.read())
284                    assert(b == htmlBody)
285                fingerprint = h.tlsSession.serverCertChain.getFingerprint()
286                assert(fingerprint)
287            time.sleep(2)
288            break
289        except timeoutEx:
290            print("timeout, retrying...")
291            pass
292
293    address = address[0], address[1]+1
294
295    implementations = []
296    if m2cryptoLoaded:
297        implementations.append("openssl")
298    if pycryptoLoaded:
299        implementations.append("pycrypto")
300    implementations.append("python")
301
302    print("Test 22 - different ciphers, TLSv1.0")
303    for implementation in implementations:
304        for cipher in ["aes128", "aes256", "rc4"]:
305
306            print("Test 22:", end=' ')
307            connection = connect()
308
309            settings = HandshakeSettings()
310            settings.cipherNames = [cipher]
311            settings.cipherImplementations = [implementation, "python"]
312            settings.minVersion = (3,1)
313            settings.maxVersion = (3,1)
314            connection.handshakeClientCert(settings=settings)
315            testConnClient(connection)
316            print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
317            connection.close()
318
319    print("Test 23 - throughput test")
320    for implementation in implementations:
321        for cipher in ["aes128", "aes256", "3des", "rc4"]:
322            if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
323                continue
324
325            print("Test 23:", end=' ')
326            connection = connect()
327
328            settings = HandshakeSettings()
329            settings.cipherNames = [cipher]
330            settings.cipherImplementations = [implementation, "python"]
331            connection.handshakeClientCert(settings=settings)
332            print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ')
333
334            startTime = time.clock()
335            connection.write(b"hello"*10000)
336            h = connection.read(min=50000, max=50000)
337            stopTime = time.clock()
338            if stopTime-startTime:
339                print("100K exchanged at rate of %d bytes/sec" % int(100000/(stopTime-startTime)))
340            else:
341                print("100K exchanged very fast")
342
343            assert(h == b"hello"*10000)
344            connection.close()
345
346    print("Test 24.a - Next-Protocol Client Negotiation")
347    connection = connect()
348    connection.handshakeClientCert(nextProtos=[b"http/1.1"])
349    #print("  Next-Protocol Negotiated: %s" % connection.next_proto)
350    assert(connection.next_proto == b'http/1.1')
351    connection.close()
352
353    print("Test 24.b - Next-Protocol Client Negotiation")
354    connection = connect()
355    connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
356    #print("  Next-Protocol Negotiated: %s" % connection.next_proto)
357    assert(connection.next_proto == b'spdy/2')
358    connection.close()
359
360    print("Test 24.c - Next-Protocol Client Negotiation")
361    connection = connect()
362    connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
363    #print("  Next-Protocol Negotiated: %s" % connection.next_proto)
364    assert(connection.next_proto == b'spdy/2')
365    connection.close()
366
367    print("Test 24.d - Next-Protocol Client Negotiation")
368    connection = connect()
369    connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
370    #print("  Next-Protocol Negotiated: %s" % connection.next_proto)
371    assert(connection.next_proto == b'spdy/2')
372    connection.close()
373
374    print("Test 24.e - Next-Protocol Client Negotiation")
375    connection = connect()
376    connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
377    #print("  Next-Protocol Negotiated: %s" % connection.next_proto)
378    assert(connection.next_proto == b'spdy/3')
379    connection.close()
380
381    print("Test 24.f - Next-Protocol Client Negotiation")
382    connection = connect()
383    connection.handshakeClientCert(nextProtos=[b"http/1.1"])
384    #print("  Next-Protocol Negotiated: %s" % connection.next_proto)
385    assert(connection.next_proto == b'http/1.1')
386    connection.close()
387
388    print("Test 24.g - Next-Protocol Client Negotiation")
389    connection = connect()
390    connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"])
391    #print("  Next-Protocol Negotiated: %s" % connection.next_proto)
392    assert(connection.next_proto == b'spdy/2')
393    connection.close()
394
395    print('Test 25 - good standard XMLRPC https client')
396    time.sleep(2) # Hack for lack of ability to set timeout here
397    address = address[0], address[1]+1
398    server = xmlrpclib.Server('https://%s:%s' % address)
399    assert server.add(1,2) == 3
400    assert server.pow(2,4) == 16
401
402    print('Test 26 - good tlslite XMLRPC client')
403    transport = XMLRPCTransport(ignoreAbruptClose=True)
404    server = xmlrpclib.Server('https://%s:%s' % address, transport)
405    assert server.add(1,2) == 3
406    assert server.pow(2,4) == 16
407
408    print('Test 27 - good XMLRPC ignored protocol')
409    server = xmlrpclib.Server('http://%s:%s' % address, transport)
410    assert server.add(1,2) == 3
411    assert server.pow(2,4) == 16
412
413    print("Test 28 - Internet servers test")
414    try:
415        i = IMAP4_TLS("cyrus.andrew.cmu.edu")
416        i.login("anonymous", "anonymous@anonymous.net")
417        i.logout()
418        print("Test 28: IMAP4 good")
419        p = POP3_TLS("pop.gmail.com")
420        p.quit()
421        print("Test 29: POP3 good")
422    except socket.error as e:
423        print("Non-critical error: socket error trying to reach internet server: ", e)
424
425    if not badFault:
426        print("Test succeeded")
427    else:
428        print("Test failed")
429
430
431
432def testConnServer(connection):
433    count = 0
434    while 1:
435        s = connection.read()
436        count += len(s)
437        if len(s) == 0:
438            break
439        connection.write(s)
440        if count == 1111:
441            break
442
443def serverTestCmd(argv):
444
445    address = argv[0]
446    dir = argv[1]
447
448    #Split address into hostname/port tuple
449    address = address.split(":")
450    address = ( address[0], int(address[1]) )
451
452    #Connect to server
453    lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
454    lsock.bind(address)
455    lsock.listen(5)
456
457    def connect():
458        return TLSConnection(lsock.accept()[0])
459
460    x509Cert = X509().parse(open(os.path.join(dir, "serverX509Cert.pem")).read())
461    x509Chain = X509CertChain([x509Cert])
462    s = open(os.path.join(dir, "serverX509Key.pem")).read()
463    x509Key = parsePEMKey(s, private=True)
464
465    print("Test 0 - Anonymous server handshake")
466    connection = connect()
467    connection.handshakeServer(anon=True)
468    testConnServer(connection)
469    connection.close()
470
471    print("Test 1 - good X.509")
472    connection = connect()
473    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
474    assert(connection.session.serverName == address[0])
475    testConnServer(connection)
476    connection.close()
477
478    print("Test 1.a - good X.509, SSL v3")
479    connection = connect()
480    settings = HandshakeSettings()
481    settings.minVersion = (3,0)
482    settings.maxVersion = (3,0)
483    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
484    testConnServer(connection)
485    connection.close()
486
487    print("Test 1.b - good X.509, RC4-MD5")
488    connection = connect()
489    settings = HandshakeSettings()
490    settings.macNames = ["sha", "md5"]
491    settings.cipherNames = ["rc4"]
492    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
493    testConnServer(connection)
494    connection.close()
495
496    if tackpyLoaded:
497        tack = Tack.createFromPem(open("./TACK1.pem", "rU").read())
498        tackUnrelated = Tack.createFromPem(open("./TACKunrelated.pem", "rU").read())
499
500        settings = HandshakeSettings()
501        settings.useExperimentalTackExtension = True
502
503        print("Test 2.a - good X.509, TACK")
504        connection = connect()
505        connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
506            tacks=[tack], activationFlags=1, settings=settings)
507        testConnServer(connection)
508        connection.close()
509
510        print("Test 2.b - good X.509, TACK unrelated to cert chain")
511        connection = connect()
512        try:
513            connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
514                tacks=[tackUnrelated], settings=settings)
515            assert(False)
516        except TLSRemoteAlert as alert:
517            if alert.description != AlertDescription.illegal_parameter:
518                raise
519
520    print("Test 3 - good SRP")
521    verifierDB = VerifierDB()
522    verifierDB.create()
523    entry = VerifierDB.makeVerifier("test", "password", 1536)
524    verifierDB["test"] = entry
525
526    connection = connect()
527    connection.handshakeServer(verifierDB=verifierDB)
528    testConnServer(connection)
529    connection.close()
530
531    print("Test 4 - SRP faults")
532    for fault in Fault.clientSrpFaults + Fault.genericFaults:
533        connection = connect()
534        connection.fault = fault
535        try:
536            connection.handshakeServer(verifierDB=verifierDB)
537            assert()
538        except:
539            pass
540        connection.close()
541
542    print("Test 6 - good SRP: with X.509 cert")
543    connection = connect()
544    connection.handshakeServer(verifierDB=verifierDB, \
545                               certChain=x509Chain, privateKey=x509Key)
546    testConnServer(connection)
547    connection.close()
548
549    print("Test 7 - X.509 with SRP faults")
550    for fault in Fault.clientSrpFaults + Fault.genericFaults:
551        connection = connect()
552        connection.fault = fault
553        try:
554            connection.handshakeServer(verifierDB=verifierDB, \
555                                       certChain=x509Chain, privateKey=x509Key)
556            assert()
557        except:
558            pass
559        connection.close()
560
561    print("Test 11 - X.509 faults")
562    for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
563        connection = connect()
564        connection.fault = fault
565        try:
566            connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
567            assert()
568        except:
569            pass
570        connection.close()
571
572    print("Test 14 - good mutual X.509")
573    connection = connect()
574    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
575    testConnServer(connection)
576    assert(isinstance(connection.session.serverCertChain, X509CertChain))
577    connection.close()
578
579    print("Test 14a - good mutual X.509, SSLv3")
580    connection = connect()
581    settings = HandshakeSettings()
582    settings.minVersion = (3,0)
583    settings.maxVersion = (3,0)
584    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
585    testConnServer(connection)
586    assert(isinstance(connection.session.serverCertChain, X509CertChain))
587    connection.close()
588
589    print("Test 15 - mutual X.509 faults")
590    for fault in Fault.clientCertFaults + Fault.genericFaults:
591        connection = connect()
592        connection.fault = fault
593        try:
594            connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
595            assert()
596        except:
597            pass
598        connection.close()
599
600    print("Test 18 - good SRP, prepare to resume")
601    sessionCache = SessionCache()
602    connection = connect()
603    connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
604    assert(connection.session.serverName == address[0])
605    testConnServer(connection)
606    connection.close()
607
608    print("Test 19 - resumption")
609    connection = connect()
610    connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
611    assert(connection.session.serverName == address[0])
612    testConnServer(connection)
613    #Don't close! -- see next test
614
615    print("Test 20 - invalidated resumption")
616    try:
617        connection.read(min=1, max=1)
618        assert() #Client is going to close the socket without a close_notify
619    except TLSAbruptCloseError as e:
620        pass
621    connection = connect()
622    try:
623        connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
624    except TLSLocalAlert as alert:
625        if alert.description != AlertDescription.bad_record_mac:
626            raise
627    connection.close()
628
629    print("Test 21 - HTTPS test X.509")
630
631    #Close the current listening socket
632    lsock.close()
633
634    #Create and run an HTTP Server using TLSSocketServerMixIn
635    class MyHTTPServer(TLSSocketServerMixIn,
636                       HTTPServer):
637        def handshake(self, tlsConnection):
638                tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
639                return True
640    cd = os.getcwd()
641    os.chdir(dir)
642    address = address[0], address[1]+1
643    httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
644    for x in range(6):
645        httpd.handle_request()
646    httpd.server_close()
647    cd = os.chdir(cd)
648
649    #Re-connect the listening socket
650    lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
651    address = address[0], address[1]+1
652    lsock.bind(address)
653    lsock.listen(5)
654
655    implementations = []
656    if m2cryptoLoaded:
657        implementations.append("openssl")
658    if pycryptoLoaded:
659        implementations.append("pycrypto")
660    implementations.append("python")
661
662    print("Test 22 - different ciphers")
663    for implementation in ["python"] * len(implementations):
664        for cipher in ["aes128", "aes256", "rc4"]:
665
666            print("Test 22:", end=' ')
667            connection = connect()
668
669            settings = HandshakeSettings()
670            settings.cipherNames = [cipher]
671            settings.cipherImplementations = [implementation, "python"]
672
673            connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
674                                        settings=settings)
675            print(connection.getCipherName(), connection.getCipherImplementation())
676            testConnServer(connection)
677            connection.close()
678
679    print("Test 23 - throughput test")
680    for implementation in implementations:
681        for cipher in ["aes128", "aes256", "3des", "rc4"]:
682            if cipher == "3des" and implementation not in ("openssl", "pycrypto"):
683                continue
684
685            print("Test 23:", end=' ')
686            connection = connect()
687
688            settings = HandshakeSettings()
689            settings.cipherNames = [cipher]
690            settings.cipherImplementations = [implementation, "python"]
691
692            connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
693                                        settings=settings)
694            print(connection.getCipherName(), connection.getCipherImplementation())
695            h = connection.read(min=50000, max=50000)
696            assert(h == b"hello"*10000)
697            connection.write(h)
698            connection.close()
699
700    print("Test 24.a - Next-Protocol Server Negotiation")
701    connection = connect()
702    settings = HandshakeSettings()
703    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
704                               settings=settings, nextProtos=[b"http/1.1"])
705    testConnServer(connection)
706    connection.close()
707
708    print("Test 24.b - Next-Protocol Server Negotiation")
709    connection = connect()
710    settings = HandshakeSettings()
711    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
712                               settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
713    testConnServer(connection)
714    connection.close()
715
716    print("Test 24.c - Next-Protocol Server Negotiation")
717    connection = connect()
718    settings = HandshakeSettings()
719    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
720                               settings=settings, nextProtos=[b"http/1.1", b"spdy/2"])
721    testConnServer(connection)
722    connection.close()
723
724    print("Test 24.d - Next-Protocol Server Negotiation")
725    connection = connect()
726    settings = HandshakeSettings()
727    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
728                               settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
729    testConnServer(connection)
730    connection.close()
731
732    print("Test 24.e - Next-Protocol Server Negotiation")
733    connection = connect()
734    settings = HandshakeSettings()
735    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
736                               settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"])
737    testConnServer(connection)
738    connection.close()
739
740    print("Test 24.f - Next-Protocol Server Negotiation")
741    connection = connect()
742    settings = HandshakeSettings()
743    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
744                               settings=settings, nextProtos=[b"spdy/3", b"spdy/2"])
745    testConnServer(connection)
746    connection.close()
747
748    print("Test 24.g - Next-Protocol Server Negotiation")
749    connection = connect()
750    settings = HandshakeSettings()
751    connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
752                               settings=settings, nextProtos=[])
753    testConnServer(connection)
754    connection.close()
755
756    print("Tests 25-27 - XMLRPXC server")
757    address = address[0], address[1]+1
758    class Server(TLSXMLRPCServer):
759
760        def handshake(self, tlsConnection):
761          try:
762              tlsConnection.handshakeServer(certChain=x509Chain,
763                                            privateKey=x509Key,
764                                            sessionCache=sessionCache)
765              tlsConnection.ignoreAbruptClose = True
766              return True
767          except TLSError as error:
768              print("Handshake failure:", str(error))
769              return False
770
771    class MyFuncs:
772        def pow(self, x, y): return pow(x, y)
773        def add(self, x, y): return x + y
774
775    server = Server(address)
776    server.register_instance(MyFuncs())
777    #sa = server.socket.getsockname()
778    #print "Serving HTTPS on", sa[0], "port", sa[1]
779    for i in range(6):
780        server.handle_request()
781
782    print("Test succeeded")
783
784
785if __name__ == '__main__':
786    if len(sys.argv) < 2:
787        printUsage("Missing command")
788    elif sys.argv[1] == "client"[:len(sys.argv[1])]:
789        clientTestCmd(sys.argv[2:])
790    elif sys.argv[1] == "server"[:len(sys.argv[1])]:
791        serverTestCmd(sys.argv[2:])
792    else:
793        printUsage("Unknown command: %s" % sys.argv[1])
794