tlstest.py
#!/usr/bin/env python
# Authors:
# Trevor Perrin
# Kees Bos - Added tests for XML-RPC
# Dimitris Moraitis - Anon ciphersuites
# Marcelo Fernandez - Added test for NPN
# Martin von Loewis - python 3 port
# Hubert Kario - several improvements
# Google - FALLBACK_SCSV test
# Efthimis Iosifidis - improvemnts of time measurement in Throughput Test
#
#
# See the LICENSE file for legal information regarding use of this file.
from __future__ import print_function
import sys
import os
import os.path
import socket
import time
import timeit
import getopt
from tempfile import mkstemp
try:
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
except ImportError:
from http.server import HTTPServer, SimpleHTTPRequestHandler
from tlslite import TLSConnection, Fault, HandshakeSettings, \
X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \
parsePEMKey, constants, \
AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \
POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \
Checker, __version__
from tlslite.handshakesettings import VirtualHost, Keypair
from tlslite.errors import *
from tlslite.utils.cryptomath import prngName, getRandomBytes
try:
import xmlrpclib
except ImportError:
# Python 3
from xmlrpc import client as xmlrpclib
import ssl
from tlslite import *
from tlslite.constants import KeyUpdateMessageType
try:
from tack.structures.Tack import Tack
except ImportError:
pass
def printUsage(s=None):
if m2cryptoLoaded:
crypto = "M2Crypto/OpenSSL"
else:
crypto = "Python crypto"
if s:
print("ERROR: %s" % s)
print("""\ntls.py version %s (using %s)
Commands:
server HOST:PORT DIRECTORY
client HOST:PORT DIRECTORY
""" % (__version__, crypto))
sys.exit(-1)
def testConnClient(conn):
b1 = os.urandom(1)
b10 = os.urandom(10)
b100 = os.urandom(100)
b1000 = os.urandom(1000)
conn.write(b1)
conn.write(b10)
conn.write(b100)
conn.write(b1000)
r1 = conn.read(min=1, max=1)
assert len(r1) == 1
assert r1 == b1
r10 = conn.read(min=10, max=10)
assert len(r10) == 10
assert r10 == b10
r100 = conn.read(min=100, max=100)
assert len(r100) == 100
assert r100 == b100
r1000 = conn.read(min=1000, max=1000)
assert len(r1000) == 1000
assert r1000 == b1000
def clientTestCmd(argv):
address = argv[0]
dir = argv[1]
#Split address into hostname/port tuple
address = address.split(":")
address = ( address[0], int(address[1]) )
#open synchronisation FIFO
synchro = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
synchro.settimeout(60)
synchro.connect((address[0], address[1]-1))
def connect():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(15)
sock.connect(address)
c = TLSConnection(sock)
return c
test_no = 0
badFault = False
print("Test {0} - anonymous handshake".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientAnonymous(settings=settings)
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 (plus SNI)".format(test_no))
synchro.recv(1)
connection = connect()
connection.handshakeClientCert(serverName=address[0])
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites)
assert(connection.encryptThenMAC == False)
assert connection.session.appProto is None
connection.close()
test_no += 1
print("Test {0} - good X.509 TLSv1.2 (plus ALPN)".format(test_no))
synchro.recv(1)
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection = connect()
connection.handshakeClientCert(serverName=address[0],
alpn=[b'http/1.1'],
settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverName == address[0]
assert connection.session.cipherSuite in constants.CipherSuite.aeadSuites
assert connection.encryptThenMAC == False
assert connection.session.appProto == b'http/1.1'
connection.close()
test_no += 1
print("Test {0} - good X.509 TLSv1.3 (plus ALPN)".format(test_no))
synchro.recv(1)
connection = connect()
connection.handshakeClientCert(serverName=address[0],
alpn=[b'http/1.1'])
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverName == address[0]
assert connection.session.cipherSuite in constants.CipherSuite.aeadSuites
assert connection.encryptThenMAC == False
assert connection.session.appProto == b'http/1.1'
connection.close()
test_no += 1
print("Test {0} - good X.509/w RSA-PSS sig".format(test_no))
synchro.recv(1)
connection = connect()
connection.handshakeClientCert(serverName=address[0])
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites)
assert(connection.encryptThenMAC == False)
connection.close()
test_no += 1
print("Test {0} - good X.509/w RSA-PSS cert".format(test_no))
synchro.recv(1)
connection = connect()
connection.handshakeClientCert(serverName=address[0])
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites)
assert(connection.encryptThenMAC == False)
connection.close()
test_no += 1
print("Test {0} - good X.509/w RSA-PSS cert in TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeClientCert(serverName=address[0], settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites)
assert(connection.encryptThenMAC == False)
connection.close()
test_no += 1
print("Test {0} - good X.509, small record_size_limit".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.record_size_limit = 64
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - good X.509, SSLv3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - good X.509 ECDSA, SSLv3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 0)
settings.maxVersion = (3, 0)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.ecdheEcdsaSuites
assert isinstance(connection.session.serverCertChain, X509CertChain)
connection.close()
test_no += 1
print("Test {0} - good X.509 ECDSA, TLSv1.0".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 1)
settings.maxVersion = (3, 1)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.ecdheEcdsaSuites
assert isinstance(connection.session.serverCertChain, X509CertChain)
connection.close()
test_no += 1
print("Test {0} - good X.509 ECDSA, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.ecdheEcdsaSuites
assert isinstance(connection.session.serverCertChain, X509CertChain)
connection.close()
test_no += 1
print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.eccCurves = ["secp384r1"]
settings.keyShares = []
try:
connection.handshakeClientCert(settings=settings)
assert False
except TLSRemoteAlert as e:
assert "handshake_failure" in str(e)
connection.close()
test_no += 1
for curve, keySize in (("brainpoolP256r1", 256),
("brainpoolP384r1", 384),
("brainpoolP512r1", 512)):
print("Test {0} - Two good ECDSA certs - {1}, TLSv1.2".format(test_no, curve))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.eccCurves = [curve]
settings.keyShares = []
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \
== keySize
connection.close()
test_no += 1
print("Test {0} - Two good ECDSA certs - secp256r1, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.eccCurves = ["secp256r1"]
settings.keyShares = []
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \
== 256
connection.close()
test_no += 1
print("Test {0} - Two good ECDSA certs - secp384r1, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.eccCurves = ["secp384r1"]
settings.keyShares = []
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \
== 384
connection.close()
test_no += 1
print("Test {0} - good X509 RSA and ECDSA, correct RSA and ECDSA sigalgs, RSA, TLSv1.2"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.rsaSigHashes = ["sha256"]
settings.ecdsaSigHashes = ["sha256"]
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "rsa"
assert connection.version == (3, 3)
connection.close()
test_no += 1
print("Test {0} - good X509 RSA and ECDSA, bad RSA and good ECDSA sigalgs, ECDSA, TLSv1.2"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.rsaSigHashes = ["sha384"]
settings.ecdsaSigHashes = ["sha256"]
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "ecdsa"
assert connection.version == (3, 3)
connection.close()
test_no += 1
print("Test {0} - good X509 RSA and ECDSA, bad RSA and ECDSA sigalgs, RSA, TLSv1.2"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.rsaSigHashes = ["sha384"]
settings.ecdsaSigHashes = ["sha384"]
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "rsa"
assert connection.version == (3, 3)
connection.close()
test_no += 1
print("Test {0} - good X509 RSA and ECDSA, correct RSA and ECDSA sigalgs, RSA, TLSv1.3"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
settings.rsaSigHashes = ["sha256"]
settings.ecdsaSigHashes = ["sha256"]
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "rsa"
assert connection.version == (3, 4)
connection.close()
test_no += 1
print("Test {0} - good X509 RSA and ECDSA, bad RSA and good ECDSA sigalgs, ECDSA, TLSv1.3"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
settings.rsaSigHashes = ["sha384"]
settings.ecdsaSigHashes = ["sha256"]
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "ecdsa"
assert connection.version == (3, 4)
connection.close()
test_no += 1
print("Test {0} - good X509 RSA and ECDSA, bad RSA and ECDSA sigalgs, RSA, TLSv1.3"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
settings.rsaSigHashes = ["sha384"]
settings.ecdsaSigHashes = ["sha384"]
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "rsa"
assert connection.version == (3, 4)
connection.close()
test_no += 1
print("Test {0} - good X.509 ECDSA, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.tls13Suites
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \
== 256
connection.close()
test_no += 1
print("Test {0} - mismatched ECDSA curve, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
settings.ecdsaSigHashes = ["sha384", "sha512"]
try:
connection.handshakeClientCert(settings=settings)
assert False
except TLSRemoteAlert as e:
assert "handshake_failure" in str(e)
connection.close()
test_no += 1
print("Test {0} - good X.509 P-384 ECDSA, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.tls13Suites
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \
== 384
connection.close()
test_no += 1
print("Test {0} - good X.509 P-521 ECDSA, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.tls13Suites
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \
== 521
connection.close()
test_no += 1
print("Test {0} - good X.509 Ed25519, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.tls13Suites
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \
== "Ed25519"
connection.close()
test_no += 1
print("Test {0} - good X.509 Ed448, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.tls13Suites
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \
== "Ed448"
connection.close()
test_no += 1
print("Test {0} - good RSA and ECDSA, TLSv1.3, rsa"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.tls13Suites
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "rsa"
assert connection.version == (3, 4)
connection.close()
test_no += 1
print("Test {0} - good RSA and ECDSA, TLSv1.3, ecdsa"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
settings.rsaSigHashes = []
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.tls13Suites
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "ecdsa"
assert connection.version == (3, 4)
connection.close()
test_no += 1
print("Test {0} - good RSA and ECDSA, TLSv1.2, rsa"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.ecdheCertSuites, connection.session.cipherSuite
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "rsa"
assert connection.version == (3, 3)
connection.close()
test_no += 1
print("Test {0} - good RSA and ECDSA, TLSv1.2, ecdsa"
.format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.rsaSigHashes = []
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.ecdheEcdsaSuites, connection.session.cipherSuite
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "ecdsa"
assert connection.version == (3, 3)
connection.close()
test_no += 1
print("Test {0} - good X.509, mismatched key_share".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.keyShares = ["x25519"]
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - good X.509, RC4-MD5".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.macNames = ["md5"]
settings.cipherNames = ["rc4"]
settings.maxVersion = (3, 3)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5)
assert(connection.encryptThenMAC == False)
connection.close()
if tackpyLoaded:
settings = HandshakeSettings()
settings.useExperimentalTackExtension = True
settings.maxVersion = (3, 3)
test_no += 1
print("Test {0} - good X.509, TACK".format(test_no))
synchro.recv(1)
connection = connect()
connection.handshakeClientCert(settings=settings)
assert(connection.session.tackExt.tacks[0].getTackId() == "5lcbe.eyweo.yxuan.rw6xd.jtoz7")
assert(connection.session.tackExt.activation_flags == 1)
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509, TACK unrelated to cert chain".\
format(test_no))
synchro.recv(1)
connection = connect()
try:
connection.handshakeClientCert(settings=settings)
assert False
except TLSLocalAlert as alert:
if alert.description != AlertDescription.illegal_parameter:
raise
connection.close()
else:
test_no += 1
print("Test {0} - good X.509, TACK...skipped (no tackpy)".\
format(test_no))
test_no += 1
print("Test {0} - good X.509, TACK unrelated to cert chain...skipped"
" (no tackpy)".\
format(test_no))
test_no += 1
print("Test {0} - good PSK".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]
connection.handshakeClientCert(settings=settings)
assert connection.session.serverCertChain is None
assert connection.ecdhCurve is not None
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - good PSK, no DH".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.psk_modes = ["psk_ke"]
settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]
connection.handshakeClientCert(settings=settings)
assert connection.session.serverCertChain is None
assert connection.ecdhCurve is None
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - good PSK, no DH, no cert".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.psk_modes = ["psk_ke"]
settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]
connection.handshakeClientCert(settings=settings)
assert connection.session.serverCertChain is None
assert connection.ecdhCurve is None
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - good SRP (db)".format(test_no))
print("client {0} - waiting for synchro".format(time.time()))
try:
synchro.recv(1)
except Exception:
print("client {0} - wait abort".format(time.time()))
raise
print("client {0} - synchro received".format(time.time()))
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientSRP("test", "password", settings=settings)
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - good SRP".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientSRP("test", "password", settings=settings)
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - SRP faults".format(test_no))
for fault in Fault.clientSrpFaults + Fault.genericFaults:
synchro.recv(1)
connection = connect()
connection.fault = fault
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
try:
connection.handshakeClientSRP("test", "password",
settings=settings)
print(" Good Fault %s" % (Fault.faultNames[fault]))
except TLSFaultError as e:
print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
badFault = True
test_no += 1
print("Test {0} - good SRP: with X.509 certificate, TLSv1.0".format(test_no))
settings = HandshakeSettings()
settings.minVersion = (3,1)
settings.maxVersion = (3,1)
synchro.recv(1)
connection = connect()
connection.handshakeClientSRP("test", "password", settings=settings)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - X.509 with SRP faults".format(test_no))
for fault in Fault.clientSrpFaults + Fault.genericFaults:
synchro.recv(1)
connection = connect()
connection.fault = fault
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
try:
connection.handshakeClientSRP("test", "password",
settings=settings)
print(" Good Fault %s" % (Fault.faultNames[fault]))
except TLSFaultError as e:
print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
badFault = True
test_no += 1
print("Test {0} - X.509 faults".format(test_no))
for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
synchro.recv(1)
connection = connect()
connection.fault = fault
try:
connection.handshakeClientCert()
print(" Good Fault %s" % (Fault.faultNames[fault]))
except TLSFaultError as e:
print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
badFault = True
test_no += 1
print("Test {0} - good mutual X.509".format(test_no))
x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())
x509Chain = X509CertChain([x509Cert])
s = open(os.path.join(dir, "clientX509Key.pem")).read()
x509Key = parsePEMKey(s, private=True)
synchro.recv(1)
connection = connect()
connection.handshakeClientCert(x509Chain, x509Key)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
connection.close()
test_no += 1
print("Test {0} - good mutual ECDSA X.509".format(test_no))
with open(os.path.join(dir, "clientECCert.pem")) as f:
x509Cert = X509().parse(f.read())
x509Chain = X509CertChain([x509Cert])
with open(os.path.join(dir, "clientECKey.pem")) as f:
x509Key = parsePEMKey(f.read(), private=True)
synchro.recv(1)
connection = connect()
connection.handshakeClientCert(x509Chain, x509Key)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert len(connection.session.serverCertChain.getEndEntityPublicKey()) ==\
256
connection.close()
test_no += 1
print("Test {0} - good mutual Ed25519 X.509".format(test_no))
with open(os.path.join(dir, "clientEd25519Cert.pem")) as f:
x509EdCert = X509().parse(f.read())
x509EdChain = X509CertChain([x509EdCert])
with open(os.path.join(dir, "clientEd25519Key.pem")) as f:
x509EdKey = parsePEMKey(f.read(), private=True)
synchro.recv(1)
connection = connect()
connection.handshakeClientCert(x509EdChain, x509EdKey)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "Ed25519"
connection.close()
test_no += 1
print("Test {0} - good mutual Ed25519 X.509, TLS 1.2".format(test_no))
with open(os.path.join(dir, "clientEd25519Cert.pem")) as f:
x509EdCert = X509().parse(f.read())
x509EdChain = X509CertChain([x509EdCert])
with open(os.path.join(dir, "clientEd25519Key.pem")) as f:
x509EdKey = parsePEMKey(f.read(), private=True)
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeClientCert(x509EdChain, x509EdKey, settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "Ed25519"
connection.close()
test_no += 1
print("Test {0} - good X.509 DSA, SSLv3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 0)
settings.maxVersion = (3, 0)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.dheDsaSuites
assert isinstance(connection.session.serverCertChain, X509CertChain)
connection.close()
test_no += 1
print("Test {0} - good X.509 DSA, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.dheDsaSuites
assert isinstance(connection.session.serverCertChain, X509CertChain)
connection.close()
test_no += 1
print("Test {0} - good X.509 Ed25519, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.ecdheEcdsaSuites
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \
== "Ed25519"
connection.close()
test_no += 1
print("Test {0} - good X.509 Ed448, TLSv1.2".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert connection.session.cipherSuite in\
constants.CipherSuite.ecdheEcdsaSuites
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \
== "Ed448"
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, TLSv1.3 no certs".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,4)
settings.maxVersion = (3,4)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,4)
settings.maxVersion = (3,4)
connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, PHA, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
synchro.recv(1)
b = connection.read(0, 0)
assert b == b''
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - good mutual X.509 Ed25519, PHA, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(x509EdChain, x509EdKey, settings=settings)
synchro.recv(1)
b = connection.read(0, 0)
assert b == b''
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\
== "Ed25519"
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, PHA and KeyUpdate, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
for result in connection.send_keyupdate_request(
KeyUpdateMessageType.update_requested):
assert result in (0, 1)
synchro.recv(1)
b = connection.read(0, 0)
assert b == b''
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - mutual X.509, PHA, no client cert, TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeClientCert(X509CertChain(), x509Key, settings=settings)
synchro.recv(1)
b = connection.read(0, 0)
assert b == b''
try:
connection.read(0, 0)
assert False
except TLSRemoteAlert as e:
assert e.description == AlertDescription.certificate_required
assert "certificate_required" in str(e), str(e)
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, TLSv1.1".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,2)
settings.maxVersion = (3,2)
connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, SSLv3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeClientCert(x509Chain, x509Key, settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - mutual X.509 faults".format(test_no))
for fault in Fault.clientCertFaults + Fault.genericFaults:
synchro.recv(1)
connection = connect()
connection.fault = fault
try:
connection.handshakeClientCert(x509Chain, x509Key)
print(" Good Fault %s" % (Fault.faultNames[fault]))
except TLSFaultError as e:
print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))
badFault = True
test_no += 1
print("Test {0} - good SRP, prepare to resume... (plus SNI)".\
format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientSRP("test", "password", serverName=address[0],
settings=settings)
testConnClient(connection)
connection.close()
session = connection.session
test_no += 1
print("Test {0} - resumption (plus SNI)".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientSRP("test", "garbage", serverName=address[0],
session=session, settings=settings)
testConnClient(connection)
#Don't close! -- see below
test_no += 1
print("Test {0} - invalidated resumption (plus SNI)".format(test_no))
synchro.recv(1)
connection.sock.close() #Close the socket without a close_notify!
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
try:
connection.handshakeClientSRP("test", "garbage",
serverName=address[0],
session=session, settings=settings)
assert False
except TLSRemoteAlert as alert:
if alert.description != AlertDescription.bad_record_mac:
raise
connection.close()
test_no += 1
print("Test {0} - HTTPS test X.509".format(test_no))
address = address[0], address[1]+1
if hasattr(socket, "timeout"):
timeoutEx = socket.timeout
else:
timeoutEx = socket.error
while 1:
try:
htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8")
fingerprint = None
for y in range(2):
checker =Checker(x509Fingerprint=fingerprint)
h = HTTPTLSConnection(\
address[0], address[1], checker=checker)
for x in range(3):
synchro.recv(1)
h.request("GET", "/index.html")
r = h.getresponse()
assert(r.status == 200)
b = bytearray(r.read())
assert(b == htmlBody)
fingerprint = h.tlsSession.serverCertChain.getFingerprint()
assert(fingerprint)
break
except timeoutEx:
print("timeout, retrying...")
pass
address = address[0], address[1]+1
implementations = []
if m2cryptoLoaded:
implementations.append("openssl")
if pycryptoLoaded:
implementations.append("pycrypto")
implementations.append("python")
test_no += 1
print("Test {0} - different ciphers, TLSv1.0".format(test_no))
for implementation in implementations:
for cipher in ["aes128", "aes256", "rc4"]:
test_no += 1
print("Test {0}:".format(test_no), end=' ')
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
settings.minVersion = (3,1)
settings.maxVersion = (3,1)
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))
connection.close()
test_no += 1
print("Test {0} - throughput test".format(test_no))
for implementation in implementations:
for cipher in ["aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8",
"aes128gcm", "aes256gcm", "aes128", "aes256", "3des",
"rc4", "chacha20-poly1305_draft00",
"chacha20-poly1305"]:
# skip tests with implementations that don't support them
if cipher == "3des" and implementation not in ("openssl",
"pycrypto"):
continue
if cipher in ("aes128gcm", "aes256gcm") and \
implementation not in ("pycrypto",
"python", "openssl"):
continue
if cipher in ("aes128ccm", "aes128ccm_8",
"aes256ccm", "aes256ccm_8") and \
implementation not in ("python", "openssl"):
continue
if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305") \
and implementation not in ("python", ):
continue
test_no += 1
print("Test {0}:".format(test_no), end=' ')
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
if cipher not in ("aes128ccm", "aes128ccm_8", "aes128gcm",
"aes256gcm", "chacha20-poly1305"):
settings.maxVersion = (3, 3)
connection.handshakeClientCert(settings=settings)
print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ')
startTime = timeit.default_timer()
connection.write(b"hello"*10000)
h = connection.read(min=50000, max=50000)
stopTime = timeit.default_timer()
sizeofdata = len(h)*2
if stopTime-startTime:
print("100K exchanged at rate of %d bytes/sec" % int(sizeofdata/(stopTime-startTime)))
else:
print("100K exchanged very fast")
assert(h == b"hello"*10000)
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Client Negotiation".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(nextProtos=[b"http/1.1"], settings=settings)
#print(" Next-Protocol Negotiated: %s" % connection.next_proto)
assert(connection.next_proto == b'http/1.1')
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Client Negotiation".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"],
settings=settings)
#print(" Next-Protocol Negotiated: %s" % connection.next_proto)
assert(connection.next_proto == b'spdy/2')
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Client Negotiation".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"],
settings=settings)
#print(" Next-Protocol Negotiated: %s" % connection.next_proto)
assert(connection.next_proto == b'spdy/2')
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Client Negotiation".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2",
b"http/1.1"],
settings=settings)
#print(" Next-Protocol Negotiated: %s" % connection.next_proto)
assert(connection.next_proto == b'spdy/2')
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Client Negotiation".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2",
b"http/1.1"],
settings=settings)
#print(" Next-Protocol Negotiated: %s" % connection.next_proto)
assert(connection.next_proto == b'spdy/3')
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Client Negotiation".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(nextProtos=[b"http/1.1"], settings=settings)
#print(" Next-Protocol Negotiated: %s" % connection.next_proto)
assert(connection.next_proto == b'http/1.1')
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Client Negotiation".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"],
settings=settings)
#print(" Next-Protocol Negotiated: %s" % connection.next_proto)
assert(connection.next_proto == b'spdy/2')
connection.close()
test_no += 1
print("Test {0} - FALLBACK_SCSV".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.sendFallbackSCSV = True
settings.maxVersion = (3, 3)
# TODO fix FALLBACK_SCSV with TLS 1.3
connection.handshakeClientCert(settings=settings)
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - FALLBACK_SCSV".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.sendFallbackSCSV = True
settings.maxVersion = (3, 2)
try:
connection.handshakeClientCert(settings=settings)
assert False
except TLSRemoteAlert as alert:
if alert.description != AlertDescription.inappropriate_fallback:
raise
connection.close()
test_no += 1
print("Test {0} - no EtM server side".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.macNames.remove("aead")
settings.maxVersion = (3, 3)
assert(settings.useEncryptThenMAC)
connection.handshakeClientCert(serverName=address[0], settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(not connection.encryptThenMAC)
connection.close()
test_no += 1
print("Test {0} - no EtM client side".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.macNames.remove("aead")
settings.useEncryptThenMAC = False
settings.maxVersion = (3, 3)
connection.handshakeClientCert(serverName=address[0], settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(not connection.encryptThenMAC)
connection.close()
test_no += 1
print("Test {0} - resumption with EtM".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.macNames.remove("aead")
settings.maxVersion = (3, 3)
connection.handshakeClientCert(serverName=address[0], settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(not connection.resumed)
assert(connection.encryptThenMAC)
connection.close()
session = connection.session
# resume
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection.handshakeClientCert(serverName=address[0], session=session,
settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(connection.resumed)
assert(connection.encryptThenMAC)
connection.close()
test_no += 1
print("Test {0} - resumption with no EtM in 2nd handshake".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.macNames.remove("aead")
settings.maxVersion = (3, 3)
connection.handshakeClientCert(serverName=address[0], settings=settings)
testConnClient(connection)
assert(isinstance(connection.session.serverCertChain, X509CertChain))
assert(connection.session.serverName == address[0])
assert(not connection.resumed)
assert(connection.encryptThenMAC)
connection.close()
session = connection.session
# resume
synchro.recv(1)
settings = HandshakeSettings()
settings.useEncryptThenMAC = False
settings.macNames.remove("aead")
settings.maxVersion = (3, 3)
connection = connect()
try:
connection.handshakeClientCert(serverName=address[0], session=session,
settings=settings)
assert False
except TLSRemoteAlert as e:
assert(str(e) == "illegal_parameter")
else:
raise AssertionError("No exception raised")
connection.close()
test_no += 1
print("Test {0} - resumption in TLSv1.3".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
# force HRR
settings.keyShares = []
connection.handshakeClientCert(serverName=address[0], settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverName == address[0]
assert not connection.resumed
assert connection.session.tickets
connection.close()
session = connection.session
# resume
synchro.recv(1)
settings = HandshakeSettings()
settings.keyShares = []
connection = connect()
connection.handshakeClientCert(serverName=address[0], session=session,
settings=settings)
testConnClient(connection)
assert connection.resumed
connection.close()
test_no += 1
print("Test {0} - resumption in TLSv1.3 with mutual X.509".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,4)
# force HRR
settings.keyShares = []
connection.handshakeClientCert(x509Chain, x509Key, serverName=address[0],
settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverName == address[0]
assert not connection.resumed
assert connection.session.tickets
connection.close()
session = connection.session
# resume
synchro.recv(1)
settings = HandshakeSettings()
settings.minVersion = (3,4)
settings.keyShares = []
connection = connect()
connection.handshakeClientCert(x509Chain, x509Key, serverName=address[0], session=session,
settings=settings)
testConnClient(connection)
assert connection.resumed
connection.close()
test_no += 1
print("Test {0} - resumption in TLSv1.3 with AES-CCM tickets".format(test_no))
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
# force HRR
settings.keyShares = []
connection.handshakeClientCert(serverName=address[0], settings=settings)
testConnClient(connection)
assert isinstance(connection.session.serverCertChain, X509CertChain)
assert connection.session.serverName == address[0]
assert not connection.resumed
assert connection.session.tickets
connection.close()
session = connection.session
# resume
synchro.recv(1)
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.keyShares = []
connection = connect()
connection.handshakeClientCert(serverName=address[0], session=session,
settings=settings)
testConnClient(connection)
assert connection.resumed
connection.close()
test_no += 1
print("Test {0} - Heartbeat extension response callback in TLSv1.2".format(test_no))
heartbeat_payload = os.urandom(50)
def heartbeat_response_check(message):
global received_payload
received_payload = message.payload
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
settings.heartbeat_response_callback = heartbeat_response_check
connection.handshakeClientCert(serverName=address[0], settings=settings)
connection.send_heartbeat_request(heartbeat_payload, 16)
testConnClient(connection)
testConnClient(connection)
connection.close()
assert heartbeat_payload == received_payload
test_no += 1
print("Test {0} - Heartbeat extension in TLSv1.3".format(test_no))
heartbeat_payload = os.urandom(50)
def heartbeat_response_check(message):
global received_payload
received_payload = message.payload
synchro.recv(1)
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 4)
settings.heartbeat_response_callback = heartbeat_response_check
connection.handshakeClientCert(serverName=address[0], settings=settings)
connection.send_heartbeat_request(heartbeat_payload, 16)
testConnClient(connection)
testConnClient(connection)
connection.close()
assert heartbeat_payload == received_payload
test_no += 1
print("Test {0} - KeyUpdate from client in TLSv1.3".format(test_no))
assert synchro.recv(1) == b'R'
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 4)
connection.handshakeClientCert(serverName=address[0], settings=settings)
assert synchro.recv(1) == b'K'
for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):
assert i in (0, 1)
assert synchro.recv(1) == b'K'
testConnClient(connection)
connection.close()
test_no += 1
print("Test {0} - mutual KeyUpdates in TLSv1.3".format(test_no))
assert synchro.recv(1) == b'R'
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 4)
connection.handshakeClientCert(serverName=address[0], settings=settings)
for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):
assert i in (0, 1)
testConnClient(connection)
synchro.send(b'R')
connection.close()
test_no += 1
print("Test {0} - multiple mutual KeyUpdates in TLSv1.3".format(test_no))
assert synchro.recv(1) == b'R'
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 4)
connection.handshakeClientCert(serverName=address[0], settings=settings)
for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):
assert i in (0, 1)
for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):
assert i in (0, 1)
testConnClient(connection)
synchro.send(b'R')
connection.close()
test_no += 1
print('Test {0} - good standard XMLRPC https client'.format(test_no))
address = address[0], address[1]+1
synchro.recv(1)
try:
# python 2.7.9 introduced certificate verification (context option)
# python 3.4.2 doesn't have it though
context = ssl.create_default_context(\
cafile=os.path.join(dir, "serverX509Cert.pem"))
server = xmlrpclib.Server('https://%s:%s' % address, context=context)
except (TypeError, AttributeError):
server = xmlrpclib.Server('https://%s:%s' % address)
synchro.recv(1)
assert server.add(1,2) == 3
synchro.recv(1)
assert server.pow(2,4) == 16
test_no += 1
print('Test {0} - good tlslite XMLRPC client'.format(test_no))
transport = XMLRPCTransport(ignoreAbruptClose=True)
server = xmlrpclib.Server('https://%s:%s' % address, transport)
synchro.recv(1)
assert server.add(1,2) == 3
synchro.recv(1)
assert server.pow(2,4) == 16
test_no += 1
print('Test {0} - good XMLRPC ignored protocol'.format(test_no))
server = xmlrpclib.Server('http://%s:%s' % address, transport)
synchro.recv(1)
assert server.add(1,2) == 3
synchro.recv(1)
assert server.pow(2,4) == 16
test_no += 1
print("Test {0} - Internet servers test".format(test_no))
try:
i = IMAP4_TLS("cyrus.andrew.cmu.edu")
i.login("anonymous", "anonymous@anonymous.net")
i.logout()
test_no += 1
print("Test {0}: IMAP4 good".format(test_no))
p = POP3_TLS("pop.gmail.com")
p.quit()
test_no += 1
print("Test {0}: POP3 good".format(test_no))
except (socket.error, socket.timeout) as e:
print("Non-critical error: socket error trying to reach internet "
"server: ", e)
synchro.close()
if not badFault:
print("Test succeeded, {0} good".format(test_no))
else:
print("Test failed")
def testConnServer(connection):
count = 0
while 1:
s = connection.read()
count += len(s)
if len(s) == 0:
break
connection.write(s)
if count == 1111:
break
def serverTestCmd(argv):
address = argv[0]
dir = argv[1]
#Split address into hostname/port tuple
address = address.split(":")
address = ( address[0], int(address[1]) )
#Create synchronisation FIFO
synchroSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
synchroSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
synchroSocket.bind((address[0], address[1]-1))
synchroSocket.listen(2)
#Connect to server
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
lsock.bind(address)
lsock.listen(5)
# following is blocking until the other side doesn't open
synchro = synchroSocket.accept()[0]
def connect():
s = lsock.accept()[0]
s.settimeout(15)
return TLSConnection(s)
with open(os.path.join(dir, "serverX509Cert.pem")) as f:
x509Cert = X509().parse(f.read())
x509Chain = X509CertChain([x509Cert])
with open(os.path.join(dir, "serverX509Key.pem")) as f:
x509Key = parsePEMKey(f.read(), private=True)
with open(os.path.join(dir, "serverRSAPSSSigCert.pem")) as f:
x509CertRSAPSSSig = X509().parse(f.read())
x509ChainRSAPSSSig = X509CertChain([x509CertRSAPSSSig])
with open(os.path.join(dir, "serverRSAPSSSigKey.pem")) as f:
x509KeyRSAPSSSig = parsePEMKey(f.read(), private=True)
with open(os.path.join(dir, "serverRSAPSSCert.pem")) as f:
x509CertRSAPSS = X509().parse(f.read())
x509ChainRSAPSS = X509CertChain([x509CertRSAPSS])
assert x509CertRSAPSS.certAlg == "rsa-pss"
with open(os.path.join(dir, "serverRSAPSSKey.pem")) as f:
x509KeyRSAPSS = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverECCert.pem")) as f:
x509CertECDSA = X509().parse(f.read())
x509ecdsaChain = X509CertChain([x509CertECDSA])
assert x509CertECDSA.certAlg == "ecdsa"
with open(os.path.join(dir, "serverECKey.pem")) as f:
x509ecdsaKey = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverP384ECCert.pem")) as f:
x509CertP384ECDSA = X509().parse(f.read())
x509ecdsaP384Chain = X509CertChain([x509CertP384ECDSA])
assert x509CertP384ECDSA.certAlg == "ecdsa"
with open(os.path.join(dir, "serverP384ECKey.pem")) as f:
x509ecdsaP384Key = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverP521ECCert.pem")) as f:
x509CertP521ECDSA = X509().parse(f.read())
x509ecdsaP521Chain = X509CertChain([x509CertP521ECDSA])
assert x509CertP521ECDSA.certAlg == "ecdsa"
with open(os.path.join(dir, "serverP521ECKey.pem")) as f:
x509ecdsaP521Key = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverBrainpoolP256r1ECCert.pem")) as f:
x509CertBrainpoolP256r1ECDSA = X509().parse(f.read())
x509ecdsaBrainpoolP256r1Chain = X509CertChain([x509CertBrainpoolP256r1ECDSA])
assert x509CertBrainpoolP256r1ECDSA.certAlg == "ecdsa"
with open(os.path.join(dir, "serverBrainpoolP256r1ECKey.pem")) as f:
x509ecdsaBrainpoolP256r1Key = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverBrainpoolP384r1ECCert.pem")) as f:
x509CertBrainpoolP384r1ECDSA = X509().parse(f.read())
x509ecdsaBrainpoolP384r1Chain = X509CertChain([x509CertBrainpoolP384r1ECDSA])
assert x509CertBrainpoolP384r1ECDSA.certAlg == "ecdsa"
with open(os.path.join(dir, "serverBrainpoolP384r1ECKey.pem")) as f:
x509ecdsaBrainpoolP384r1Key = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverBrainpoolP512r1ECCert.pem")) as f:
x509CertBrainpoolP512r1ECDSA = X509().parse(f.read())
x509ecdsaBrainpoolP512r1Chain = X509CertChain([x509CertBrainpoolP512r1ECDSA])
assert x509CertBrainpoolP512r1ECDSA.certAlg == "ecdsa"
with open(os.path.join(dir, "serverBrainpoolP512r1ECKey.pem")) as f:
x509ecdsaBrainpoolP512r1Key = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverRSANonCACert.pem")) as f:
x509CertRSANonCA = X509().parse(f.read())
x509ChainRSANonCA = X509CertChain([x509CertRSANonCA])
assert x509CertRSANonCA.certAlg == "rsa"
with open(os.path.join(dir, "serverRSANonCAKey.pem")) as f:
x509KeyRSANonCA = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverECDSANonCACert.pem")) as f:
x509CertECDSANonCA = X509().parse(f.read())
x509ChainECDSANonCA = X509CertChain([x509CertECDSANonCA])
assert x509CertECDSANonCA.certAlg == "ecdsa"
with open(os.path.join(dir, "serverECDSANonCAKey.pem")) as f:
x509KeyECDSANonCA = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverDSACert.pem")) as f:
x509CertDSA = X509().parse(f.read())
x509ChainDSA = X509CertChain([x509CertDSA])
assert x509CertDSA.certAlg == "dsa"
with open(os.path.join(dir, "serverDSAKey.pem")) as f:
x509KeyDSA = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverEd25519Cert.pem")) as f:
x509CertEd25519 = X509().parse(f.read())
x509Ed25519Chain = X509CertChain([x509CertEd25519])
assert x509CertEd25519.certAlg == "Ed25519"
with open(os.path.join(dir, "serverEd25519Key.pem")) as f:
x509Ed25519Key = parsePEMKey(f.read(), private=True,
implementations=["python"])
with open(os.path.join(dir, "serverEd448Cert.pem")) as f:
x509CertEd448 = X509().parse(f.read())
x509Ed448Chain = X509CertChain([x509CertEd448])
assert x509CertEd448.certAlg == "Ed448"
with open(os.path.join(dir, "serverEd448Key.pem")) as f:
x509Ed448Key = parsePEMKey(f.read(), private=True,
implementations=["python"])
test_no = 0
print("Test {0} - Anonymous server handshake".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(anon=True)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 (plus SNI)".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
assert connection.session.serverName == address[0]
assert connection.extendedMasterSecret
assert connection.session.appProto is None
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 TLSv1.2 (plus ALPN)".format(test_no))
synchro.send(b'R')
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
alpn=[b'http/1.1', b'http/1.0'],
settings=settings)
assert connection.session.serverName == address[0]
assert connection.extendedMasterSecret
assert connection.session.appProto == b'http/1.1'
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 TLSv1.3 (plus ALPN)".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
alpn=[b'http/1.1', b'http/1.0'])
assert connection.session.serverName == address[0]
assert connection.extendedMasterSecret
assert connection.session.appProto == b'http/1.1'
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509/w RSA-PSS sig".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509ChainRSAPSSSig,
privateKey=x509KeyRSAPSSSig)
assert(connection.extendedMasterSecret)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509/w RSA-PSS cert".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509ChainRSAPSS,
privateKey=x509KeyRSAPSS)
assert(connection.session.serverName == address[0])
assert(connection.extendedMasterSecret)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509/w RSA-PSS cert".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeServer(certChain=x509ChainRSAPSS,
privateKey=x509KeyRSAPSS,
settings=settings)
assert(connection.session.serverName == address[0])
assert(connection.extendedMasterSecret)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509, small record_size_limit".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.record_size_limit = 64
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509, SSLv3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
assert(not connection.extendedMasterSecret)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 ECDSA, SSLv3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 0)
settings.maxVersion = (3, 0)
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
assert not connection.extendedMasterSecret
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 ECDSA, TLSv1.0".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 1)
settings.maxVersion = (3, 1)
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
assert connection.extendedMasterSecret
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 ECDSA, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
assert connection.extendedMasterSecret
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
try:
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
assert False
except TLSLocalAlert as e:
assert "curve in the public key is not supported by the client" in str(e)
connection.close()
test_no += 1
for curve, certChain, key in (("brainpoolP256r1", x509ecdsaBrainpoolP256r1Chain, x509ecdsaBrainpoolP256r1Key),
("brainpoolP384r1", x509ecdsaBrainpoolP384r1Chain, x509ecdsaBrainpoolP384r1Key),
("brainpoolP512r1", x509ecdsaBrainpoolP512r1Chain, x509ecdsaBrainpoolP512r1Key)):
print("Test {0} - Two good ECDSA certs - {1}, TLSv1.2".format(test_no, curve))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
settings.eccCurves = [curve, "secp256r1"]
settings.keyShares = []
v_host = VirtualHost()
v_host.keys = [Keypair(x509ecdsaKey, x509ecdsaChain.x509List)]
settings.virtual_hosts = [v_host]
connection.handshakeServer(certChain=certChain,
privateKey=key, settings=settings)
assert connection.extendedMasterSecret
assert connection.session.serverCertChain == certChain
testConnServer(connection)
connection.close()
test_no += 1
for curve, exp_chain in (("secp256r1", x509ecdsaChain),
("secp384r1", x509ecdsaP384Chain)):
print("Test {0} - Two good ECDSA certs - {1}, TLSv1.2"
.format(test_no, curve))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
v_host = VirtualHost()
v_host.keys = [Keypair(x509ecdsaKey, x509ecdsaChain.x509List)]
settings.virtual_hosts = [v_host]
connection.handshakeServer(certChain=x509ecdsaP384Chain,
privateKey=x509ecdsaP384Key, settings=settings)
assert connection.extendedMasterSecret
assert connection.session.serverCertChain == exp_chain
testConnServer(connection)
connection.close()
test_no += 1
for tls_ver in ("TLSv1.2", "TLSv1,3"):
print("Test {0} - good X509 RSA and ECDSA, correct RSA and ECDSA sigalgs, RSA, {1}"
.format(test_no, tls_ver))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 4)
v_host = VirtualHost()
v_host.keys = [Keypair(x509KeyECDSANonCA, x509ChainECDSANonCA.x509List)]
settings.virtual_hosts = [v_host]
connection.handshakeServer(certChain=x509ChainRSANonCA,
privateKey=x509KeyRSANonCA,
settings=settings)
assert connection.extendedMasterSecret
assert connection.session.serverCertChain == x509ChainRSANonCA
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X509 RSA and ECDSA, bad RSA and good ECDSA sigalgs, ECDSA, {1}"
.format(test_no, tls_ver))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 4)
v_host = VirtualHost()
v_host.keys = [Keypair(x509KeyECDSANonCA, x509ChainECDSANonCA.x509List)]
settings.virtual_hosts = [v_host]
connection.handshakeServer(certChain=x509ChainRSANonCA,
privateKey=x509KeyRSANonCA,
settings=settings)
assert connection.extendedMasterSecret
assert connection.session.serverCertChain == x509ChainECDSANonCA
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X509 RSA and ECDSA, bad RSA and ECDSA sigalgs, RSA, {1}"
.format(test_no, tls_ver))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 4)
v_host = VirtualHost()
v_host.keys = [Keypair(x509KeyECDSANonCA, x509ChainECDSANonCA.x509List)]
settings.virtual_hosts = [v_host]
connection.handshakeServer(certChain=x509ChainRSANonCA,
privateKey=x509KeyRSANonCA,
settings=settings)
assert connection.extendedMasterSecret
assert connection.session.serverCertChain == x509ChainRSANonCA
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 ECDSA, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
assert connection.extendedMasterSecret
testConnServer(connection)
connection.close()
test_no += 1
# check what happens when client doesn't advertise support for signature
# algoritm compatible with server key
print("Test {0} - mismatched ECDSA curve, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
try:
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, settings=settings)
assert False
except TLSLocalAlert as e:
assert "No common signature algorithms" in str(e)
connection.close()
test_no += 1
print("Test {0} - good X.509 P-384 ECDSA, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509ecdsaP384Chain,
privateKey=x509ecdsaP384Key, settings=settings)
assert connection.extendedMasterSecret
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 P-521 ECDSA, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509ecdsaP521Chain,
privateKey=x509ecdsaP521Key, settings=settings)
assert connection.extendedMasterSecret
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 Ed25519, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509Ed25519Chain,
privateKey=x509Ed25519Key, settings=settings)
assert connection.extendedMasterSecret
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 Ed448, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509Ed448Chain,
privateKey=x509Ed448Key, settings=settings)
assert connection.extendedMasterSecret
testConnServer(connection)
connection.close()
test_no += 1
for prot in ["TLSv1.3", "TLSv1.2"]:
for c_type, exp_chain in (("rsa", x509Chain),
("ecdsa", x509ecdsaChain)):
print("Test {0} - good RSA and ECDSA, {2}, {1}"
.format(test_no, c_type, prot))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 4)
v_host = VirtualHost()
v_host.keys = [Keypair(x509ecdsaKey, x509ecdsaChain.x509List)]
settings.virtual_hosts = [v_host]
connection.handshakeServer(certChain=x509Chain,
privateKey=x509Key, settings=settings)
assert connection.extendedMasterSecret
assert connection.session.serverCertChain == exp_chain
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509, mismatched key_share".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1"]
settings.keyShares = ["secp256r1"]
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509, RC4-MD5".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.macNames = ["sha", "md5"]
settings.cipherNames = ["rc4"]
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)
testConnServer(connection)
connection.close()
if tackpyLoaded:
tack = Tack.createFromPem(
open(os.path.join(dir, "TACK1.pem"), "rU").read())
tackUnrelated = Tack.createFromPem(
open(os.path.join(dir, "TACKunrelated.pem"), "rU").read())
settings = HandshakeSettings()
settings.useExperimentalTackExtension = True
test_no += 1
print("Test {0} - good X.509, TACK".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
tacks=[tack], activationFlags=1, settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509, TACK unrelated to cert chain".\
format(test_no))
synchro.send(b'R')
connection = connect()
try:
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
tacks=[tackUnrelated], settings=settings)
assert False
except TLSRemoteAlert as alert:
if alert.description != AlertDescription.illegal_parameter:
raise
else:
test_no += 1
print("Test {0} - good X.509, TACK...skipped (no tackpy)".\
format(test_no))
test_no += 1
print("Test {0} - good X.509, TACK unrelated to cert chain"
"...skipped (no tackpy)".format(test_no))
test_no += 1
print("Test {0} - good PSK".format(test_no))
synchro.send(b'R')
settings = HandshakeSettings()
settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good PSK, no DH".format(test_no))
synchro.send(b'R')
settings = HandshakeSettings()
settings.psk_modes = ["psk_ke"]
settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good PSK, no DH, no cert".format(test_no))
synchro.send(b'R')
settings = HandshakeSettings()
settings.psk_modes = ["psk_ke"]
settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]
connection = connect()
connection.handshakeServer(settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good SRP (db)".format(test_no))
try:
import logging
logging.basicConfig(level=logging.DEBUG)
(db_file, db_name) = mkstemp()
print("server {0} - tmp file created".format(time.time()))
os.close(db_file)
print("server {0} - tmp file closed".format(time.time()))
# this is race'y but the interface dbm interface is stupid like that...
os.remove(db_name)
print("server {0} - tmp file removed".format(time.time()))
verifierDB = VerifierDB(db_name)
print("server {0} - verifier initialised".format(time.time()))
verifierDB.create()
print("server {0} - verifier created".format(time.time()))
entry = VerifierDB.makeVerifier("test", "password", 1536)
print("server {0} - entry created".format(time.time()))
verifierDB[b"test"] = entry
print("server {0} - entry added".format(time.time()))
synchro.send(b'R')
print("server {0} - synchro sent".format(time.time()))
connection = connect()
connection.handshakeServer(verifierDB=verifierDB)
testConnServer(connection)
connection.close()
finally:
try:
os.remove(db_name)
except FileNotFoundError:
# dbm module may create files with different names depending on
# platform
os.remove(db_name + ".dat")
test_no += 1
print("Test {0} - good SRP".format(test_no))
verifierDB = VerifierDB()
verifierDB.create()
entry = VerifierDB.makeVerifier("test", "password", 1536)
verifierDB[b"test"] = entry
synchro.send(b'R')
connection = connect()
connection.handshakeServer(verifierDB=verifierDB)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - SRP faults".format(test_no))
for fault in Fault.clientSrpFaults + Fault.genericFaults:
synchro.send(b'R')
connection = connect()
connection.fault = fault
connection.handshakeServer(verifierDB=verifierDB)
connection.close()
test_no += 1
print("Test {0} - good SRP: with X.509 certificate, TLSv1.0".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, \
certChain=x509Chain, privateKey=x509Key)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - X.509 with SRP faults".format(test_no))
for fault in Fault.clientSrpFaults + Fault.genericFaults:
synchro.send(b'R')
connection = connect()
connection.fault = fault
connection.handshakeServer(verifierDB=verifierDB, \
certChain=x509Chain, privateKey=x509Key)
connection.close()
test_no += 1
print("Test {0} - X.509 faults".format(test_no))
for fault in Fault.clientNoAuthFaults + Fault.genericFaults:
synchro.send(b'R')
connection = connect()
connection.fault = fault
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
connection.close()
test_no += 1
print("Test {0} - good mutual X.509".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
testConnServer(connection)
assert(isinstance(connection.session.clientCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - good mutual ECDSA X.509".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509ecdsaChain,
privateKey=x509ecdsaKey, reqCert=True)
testConnServer(connection)
assert(isinstance(connection.session.clientCertChain, X509CertChain))
assert len(connection.session.clientCertChain.getEndEntityPublicKey()) ==\
256
connection.close()
test_no += 1
print("Test {0} - good mutual Ed25519 X.509".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Ed25519Chain,
privateKey=x509Ed25519Key, reqCert=True)
testConnServer(connection)
assert(isinstance(connection.session.clientCertChain, X509CertChain))
assert connection.session.clientCertChain.getEndEntityPublicKey().key_type\
== "Ed25519"
connection.close()
test_no += 1
print("Test {0} - good mutual Ed25519 X.509, TLS 1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeServer(certChain=x509Ed25519Chain,
privateKey=x509Ed25519Key, reqCert=True,
settings=settings)
testConnServer(connection)
assert(isinstance(connection.session.clientCertChain, X509CertChain))
assert connection.session.clientCertChain.getEndEntityPublicKey().key_type\
== "Ed25519"
connection.close()
test_no += 1
print("Test {0} - good X.509 DSA, SSLv3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 0)
settings.maxVersion = (3, 0)
connection.handshakeServer(certChain=x509ChainDSA,
privateKey=x509KeyDSA, settings=settings)
assert not connection.extendedMasterSecret
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 DSA, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeServer(certChain=x509ChainDSA,
privateKey=x509KeyDSA, settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 Ed25519, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeServer(certChain=x509Ed25519Chain,
privateKey=x509Ed25519Key, settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good X.509 Ed448, TLSv1.2".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 3)
settings.maxVersion = (3, 3)
connection.handshakeServer(certChain=x509Ed448Chain,
privateKey=x509Ed448Key, settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, TLSv1.3 no certs".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,4)
settings.maxVersion = (3,4)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
testConnServer(connection)
assert not connection.session.clientCertChain
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,4)
settings.maxVersion = (3,4)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
testConnServer(connection)
assert isinstance(connection.session.clientCertChain, X509CertChain)
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, PHA, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
assert connection.session.clientCertChain is None
for result in connection.request_post_handshake_auth(settings):
assert result in (0, 1)
synchro.send(b'R')
testConnServer(connection)
assert connection.session.clientCertChain is not None
assert isinstance(connection.session.clientCertChain, X509CertChain)
connection.close()
test_no += 1
print("Test {0} - good mutual X.509 Ed25519, PHA, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509Ed25519Chain,
privateKey=x509Ed25519Key,
settings=settings)
assert connection.session.clientCertChain is None
for result in connection.request_post_handshake_auth(settings):
assert result in (0, 1)
synchro.send(b'R')
testConnServer(connection)
assert connection.session.clientCertChain is not None
assert isinstance(connection.session.clientCertChain, X509CertChain)
assert connection.session.clientCertChain.getEndEntityPublicKey().key_type\
== "Ed25519"
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, PHA and KeyUpdate, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
assert connection.session.clientCertChain is None
for result in connection.request_post_handshake_auth(settings):
assert result in (0, 1)
synchro.send(b'R')
assert connection.read(0, 0) == b''
assert connection.session.clientCertChain is not None
assert isinstance(connection.session.clientCertChain, X509CertChain)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - mutual X.509, PHA, no client cert, TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
connection.client_cert_required = True
assert connection.session.clientCertChain is None
for result in connection.request_post_handshake_auth(settings):
assert result in (0, 1)
synchro.send(b'R')
try:
testConnServer(connection)
assert False
except TLSLocalAlert as e:
assert "Client did not provide a certificate in post-handshake" in \
str(e)
assert e.description == AlertDescription.certificate_required
assert connection.session.clientCertChain is None
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, TLSv1.1".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,2)
settings.maxVersion = (3,2)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
testConnServer(connection)
assert(isinstance(connection.session.clientCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - good mutual X.509, SSLv3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,0)
settings.maxVersion = (3,0)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)
testConnServer(connection)
assert(isinstance(connection.session.clientCertChain, X509CertChain))
connection.close()
test_no += 1
print("Test {0} - mutual X.509 faults".format(test_no))
for fault in Fault.clientCertFaults + Fault.genericFaults:
synchro.send(b'R')
connection = connect()
connection.fault = fault
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)
connection.close()
test_no += 1
print("Test {0} - good SRP, prepare to resume".format(test_no))
synchro.send(b'R')
sessionCache = SessionCache()
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
assert(connection.session.serverName == address[0])
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - resumption (plus SNI)".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
assert(connection.session.serverName == address[0])
testConnServer(connection)
#Don't close! -- see next test
test_no += 1
print("Test {0} - invalidated resumption (plus SNI)".format(test_no))
synchro.send(b'R')
try:
connection.read(min=1, max=1)
assert False #Client is going to close the socket without a close_notify
except TLSAbruptCloseError as e:
pass
synchro.send(b'R')
connection = connect()
try:
connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)
assert False
except TLSLocalAlert as alert:
if alert.description != AlertDescription.bad_record_mac:
raise
connection.close()
test_no += 1
print("Test {0} - HTTPS test X.509".format(test_no))
#Close the current listening socket
lsock.close()
#Create and run an HTTP Server using TLSSocketServerMixIn
class MyHTTPServer(TLSSocketServerMixIn,
HTTPServer):
def handshake(self, tlsConnection):
tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
return True
def server_bind(self):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
HTTPServer.server_bind(self)
cd = os.getcwd()
os.chdir(dir)
address = address[0], address[1]+1
httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)
for x in range(6):
synchro.send(b'R')
httpd.handle_request()
httpd.server_close()
cd = os.chdir(cd)
#Re-connect the listening socket
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
address = address[0], address[1]+1
lsock.bind(address)
lsock.listen(5)
implementations = []
if m2cryptoLoaded:
implementations.append("openssl")
if pycryptoLoaded:
implementations.append("pycrypto")
implementations.append("python")
test_no += 1
print("Test {0} - different ciphers, TLSv1.0".format(test_no))
for implementation in ["python"] * len(implementations):
for cipher in ["aes128", "aes256", "rc4"]:
test_no += 1
print("Test {0}:".format(test_no), end=' ')
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
print(connection.getCipherName(), connection.getCipherImplementation())
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - throughput test".format(test_no))
for implementation in implementations:
for cipher in ["aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8",
"aes128gcm", "aes256gcm", "aes128", "aes256", "3des",
"rc4", "chacha20-poly1305_draft00",
"chacha20-poly1305"]:
# skip tests with implementations that don't support them
if cipher == "3des" and implementation not in ("openssl",
"pycrypto"):
continue
if cipher in ("aes128gcm", "aes256gcm") and \
implementation not in ("pycrypto",
"python", "openssl"):
continue
if cipher in ("aes128ccm", "aes128ccm_8",
"aes256ccm", "aes256ccm_8") and \
implementation not in ("python", "openssl"):
continue
if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305") \
and implementation not in ("python", ):
continue
test_no += 1
print("Test {0}:".format(test_no), end=' ')
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.cipherNames = [cipher]
settings.cipherImplementations = [implementation, "python"]
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
print(connection.getCipherName(), connection.getCipherImplementation())
h = connection.read(min=50000, max=50000)
assert(h == b"hello"*10000)
connection.write(h)
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Server Negotiation".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings, nextProtos=[b"http/1.1"])
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Server Negotiation".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Server Negotiation".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings, nextProtos=[b"http/1.1", b"spdy/2"])
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Server Negotiation".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Server Negotiation".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"])
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Server Negotiation".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings, nextProtos=[b"spdy/3", b"spdy/2"])
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - Next-Protocol Server Negotiation".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings, nextProtos=[])
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - FALLBACK_SCSV".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
# TODO fix FALLBACK with TLS1.3
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - FALLBACK_SCSV".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
# TODO fix FALLBACK with TLS1.3
settings.maxVersion = (3, 3)
try:
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
assert False
except TLSLocalAlert as alert:
if alert.description != AlertDescription.inappropriate_fallback:
raise
connection.close()
test_no += 1
print("Test {0} - no EtM server side".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.useEncryptThenMAC = False
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - no EtM client side".format(test_no))
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - resumption with EtM".format(test_no))
synchro.send(b'R')
sessionCache = SessionCache()
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
sessionCache=sessionCache)
testConnServer(connection)
connection.close()
# resume
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
sessionCache=sessionCache)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - resumption with no EtM in 2nd handshake".format(test_no))
synchro.send(b'R')
sessionCache = SessionCache()
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
sessionCache=sessionCache)
testConnServer(connection)
connection.close()
# resume
synchro.send(b'R')
connection = connect()
try:
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
sessionCache=sessionCache)
assert False
except TLSLocalAlert as e:
assert(str(e) == "illegal_parameter")
else:
raise AssertionError("no exception raised")
connection.close()
test_no += 1
print("Test {0} - resumption in TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,4)
settings.ticketKeys = [getRandomBytes(32)]
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
testConnServer(connection)
connection.close()
# resume
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - resumption in TLSv1.3 with mutual X.509".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3,4)
settings.ticketKeys = [getRandomBytes(32)]
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
reqCert=True, settings=settings)
testConnServer(connection)
connection.close()
# resume
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
reqCert=True, settings=settings)
testConnServer(connection)
assert connection.session.clientCertChain
connection.close()
test_no += 1
print("Test {0} - resumption in TLSv1.3 with AES-CCM tickets".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.minVersion = (3, 4)
settings.ticketKeys = [getRandomBytes(32)]
settings.ticketCipher = "aes128ccm"
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
testConnServer(connection)
connection.close()
# resume
synchro.send(b'R')
connection = connect()
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - Heartbeat extension response callback in TLSv1.2".format(test_no))
heartbeat_payload = os.urandom(50)
def heartbeat_response_check(message):
global received_payload
received_payload = message.payload
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 3)
settings.heartbeat_response_callback = heartbeat_response_check
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
connection.send_heartbeat_request(heartbeat_payload, 16)
testConnServer(connection)
testConnServer(connection)
connection.close()
assert heartbeat_payload == received_payload
test_no += 1
print("Test {0} - Heartbeat extension in TLSv1.3".format(test_no))
heartbeat_payload = os.urandom(50)
def heartbeat_response_check(message):
global received_payload
received_payload = message.payload
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 4)
settings.heartbeat_response_callback = heartbeat_response_check
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
connection.send_heartbeat_request(heartbeat_payload, 16)
testConnServer(connection)
testConnServer(connection)
connection.close()
assert heartbeat_payload == received_payload
test_no += 1
print("Test {0} - KeyUpdate from client in TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
synchro.send(b'K')
synchro.send(b'K')
testConnServer(connection)
connection.close()
test_no += 1
print("Test {0} - mutual KeyUpdates in TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):
assert i in (0, 1)
testConnServer(connection)
assert synchro.recv(1) == b'R'
connection.close()
test_no += 1
print("Test {0} - multiple mutual KeyUpdates in TLSv1.3".format(test_no))
synchro.send(b'R')
connection = connect()
settings = HandshakeSettings()
settings.maxVersion = (3, 4)
connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,
settings=settings)
for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):
assert i in (0, 1)
for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):
assert i in (0, 1)
testConnServer(connection)
assert synchro.recv(1) == b'R'
connection.close()
test_no += 1
print("Tests {0}-{1} - XMLRPXC server".format(test_no, test_no + 2))
address = address[0], address[1]+1
class Server(TLSXMLRPCServer):
def handshake(self, tlsConnection):
try:
tlsConnection.handshakeServer(certChain=x509Chain,
privateKey=x509Key,
sessionCache=sessionCache)
tlsConnection.ignoreAbruptClose = True
return True
except TLSError as error:
print("Handshake failure:", str(error))
return False
class MyFuncs:
def pow(self, x, y): return pow(x, y)
def add(self, x, y): return x + y
server = Server(address)
server.register_instance(MyFuncs())
synchro.send(b'R')
#sa = server.socket.getsockname()
#print "Serving HTTPS on", sa[0], "port", sa[1]
for i in range(6):
synchro.send(b'R')
server.handle_request()
synchro.close()
synchroSocket.close()
test_no += 2
print("Test succeeded")
if __name__ == '__main__':
if len(sys.argv) < 2:
printUsage("Missing command")
elif sys.argv[1] == "client"[:len(sys.argv[1])]:
clientTestCmd(sys.argv[2:])
elif sys.argv[1] == "server"[:len(sys.argv[1])]:
serverTestCmd(sys.argv[2:])
else:
printUsage("Unknown command: %s" % sys.argv[1])