Revision dbd56c149072e656ca8d6a43a59588f3e7513da2 authored by Hubert Kario on 29 September 2021, 13:05:34 UTC, committed by GitHub on 29 September 2021, 13:05:34 UTC
ExpectAlert - add __repr__
combinatorial.py
from __future__ import print_function
import os
import csv
import sys
import time
import getopt
import socket
import threading
import traceback
import subprocess
try:
import queue
except ImportError:
import Queue as queue
from tlslite.handshakesettings import HandshakeSettings
from tlslite.constants import CipherSuite, ExtensionType, SignatureScheme, \
SignatureAlgorithm, HashAlgorithm, AlertDescription, AlertLevel, \
GroupName, ECPointFormat
from tlslite.extensions import TLSExtension, SNIExtension, \
RenegotiationInfoExtension, SignatureAlgorithmsExtension, \
SupportedGroupsExtension, StatusRequestExtension, \
ECPointFormatsExtension
from tlslite.utils.cryptomath import getRandomBytes
from tlslite.utils.keyfactory import parsePEMKey
from tlslite.x509 import X509
from tlslite.x509certchain import X509CertChain
from tlsfuzzer.messages import Connect, ClientHelloGenerator, \
CertificateGenerator, CertificateVerifyGenerator, \
ClientKeyExchangeGenerator, ChangeCipherSpecGenerator, \
FinishedGenerator, AlertGenerator, ResetRenegotiationInfo, \
ResetHandshakeHashes
from tlsfuzzer.expect import ExpectServerHello, ExpectCertificate, \
ExpectServerHelloDone, ExpectServerKeyExchange, \
ExpectCertificateRequest, ExpectChangeCipherSpec, ExpectFinished, \
ExpectAlert, ExpectClose
from tlsfuzzer.runner import Runner
out = queue.Queue()
def help_msg():
print("Usage <script-name> -i definitions.csv")
print("-i file name of the CSV file generated by ACTS with")
print(" definitions of all the test cases to run")
def skip_comments(f):
"""Skip the comments (lines starting with '#') in CSV file."""
while True:
pos = f.tell()
r = f.read(1)
if r == b'#':
f.readline()
continue
else:
f.seek(pos)
break
def conv_generator(conf, host, port, sni_hostname, cert=None, key=None):
"""Generate a conversation based on dict with configuration."""
root = Connect(host, port)
hs = HandshakeSettings()
# only RSA is supported
if conf['Server_authentication'] != "RSA" and \
conf['Server_authentication'] != "anon":
print("Substituting {0} to RSA for server auth"
.format(conf['Server_authentication']), file=sys.stderr)
# get the cipher that matches the imposed restrictions
cipher_trans = {"AES_128_CBC": "aes128",
"AES_256_CBC": "aes256",
"AES_128_GCM": "aes128gcm",
"AES_256_GCM": "aes256gcm",
"3DES_EDE_CBC": "3des",
"RC4": "rc4",
"Chacha20_Poly1305": "chacha20-poly1305"}
hs.cipherNames = [cipher_trans.get(conf['Cipher'], None)]
if hs.cipherNames == [None]:
raise ValueError("Unknown cipher type: {0}".format(conf['Cipher']))
mac_trans = {"AEAD": "aead",
"MD5_HMAC": "md5",
"SHA1_HMAC": "sha",
"SHA256_HMAC": "sha256",
"SHA384_HMAC": "sha384"}
hs.macNames = [mac_trans.get(conf['Integrity'], None)]
if hs.macNames == [None]:
raise ValueError("Unknown integrity type: {0}"
.format(conf['Integrity']))
if conf['Key_exchange'] == 'DHE' and \
conf['Server_authentication'] == "anon":
suites = CipherSuite.getAnonSuites(hs)
elif conf['Key_exchange'] == 'ECDHE' and \
conf['Server_authentication'] == "anon":
suites = CipherSuite.getEcdhAnonSuites(hs)
elif conf['Key_exchange'] == 'RSA':
suites = CipherSuite.getCertSuites(hs)
elif conf['Key_exchange'] == 'DHE':
suites = CipherSuite.getDheCertSuites(hs)
elif conf['Key_exchange'] == 'ECDHE':
suites = CipherSuite.getEcdheCertSuites(hs)
else:
raise ValueError("Unknown key exchange type: {0}"
.format(conf['Key_exchange']))
if not suites:
raise ValueError("Couldn't find matching cipher for {0} {3} {1} {2}"
.format(conf['Key_exchange'],
conf['Cipher'],
conf['Integrity'],
conf['Server_authentication']))
# session ID/resumption handling
if conf['CH_SessionID'] == 'random':
sess_ID = getRandomBytes(16)
elif conf['CH_SessionID'] == 'empty':
sess_ID = bytearray()
else:
raise ValueError("Unknown CH_SessionID value"
.format(conf['CH_SessionID']))
# compression
if conf['CH_compression'] == 'null_only':
compress = [0]
elif conf['CH_compression'] == 'null_and_deflate':
compress = [0, 1]
else:
raise ValueError("Unknown CH_compression value: {0}"
.format(conf['CH_compression']))
# Renegotiation Info
if conf['CH_renegotiation_info_SCSV'] == "first":
suites.insert(0, CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
elif conf['CH_renegotiation_info_SCSV'] == "last":
suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
elif conf['CH_renegotiation_info_SCSV'] == "absent":
pass
elif conf['CH_renegotiation_info_SCSV'] == "second":
suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
suites.append(0xeaea) # GREASE
else:
raise ValueError("Unexpected CH_renegotiation_info_SCSV value: {0}"
.format(conf['CH_renegotiation_info_SCSV']))
# whether to send extensions
if conf['CH_extensions_present'] == "false":
ext = None
elif conf['CH_extensions_present'] != "true":
raise ValueError("Unexpected CH_extensions_present value: {0}"
.format(conf['CH_extensions_present']))
else:
ext = dict()
# session ticket
if conf['CH_session_ticket'] != "no_ext":
print("Not generating session ticket extension", file=sys.stderr)
# renegotiation info
if conf['CH_renegotiation_info_ext'] == "true":
ext[ExtensionType.renegotiation_info] = \
RenegotiationInfoExtension().create(bytearray())
elif conf['CH_renegotiation_info_ext'] == "false":
pass
else:
raise ValueError("Unknown option in CH_renegotiation_info_ext: {0}"
.format(conf['CH_renegotiation_info_ext']))
# signature algorithms
if conf['CH_signature_algorithms_ext'] == "false":
pass
elif conf['CH_signature_algorithms_ext'] != "true":
raise ValueError("Unknown option CH_signature_algorithms_ext: {0}"
.format(conf["CH_signature_algorithms_ext"]))
else:
sig = conf['SKE_signature_scheme']
if sig == "none" or sig == "no_message":
# enter some random ones:
ext[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension()\
.create([SignatureScheme.rsa_pkcs1_sha256,
SignatureScheme.rsa_pss_sha256])
else:
if "dsa" in sig:
print("Changing {0} to RSA scheme".format(sig))
sig = sig.replace("ecdsa", "rsa")
sig = sig.replace("dsa", "rsa")
sig = sig.replace("rsa_sha", "rsa_pkcs1_sha")
sig = sig.replace("rsapss", "rsa_pss")
if "sha224" in sig:
scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa)
else:
scheme = getattr(SignatureScheme, sig)
ext[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension()\
.create([scheme])
# supported groups extension
if conf['CH_supported_groups_ext'] == "false":
groups = [GroupName.ffdhe2048, GroupName.secp256r1,
GroupName.x25519]
ext[ExtensionType.supported_groups] = \
SupportedGroupsExtension().create(groups)
pass
elif conf['CH_supported_groups_ext'] != "true":
raise ValueError("Unknown option in CH_supported_groups_ext: {0}"
.format(conf['CH_supported_groups_ext']))
else:
if conf['SKE_dh_group'] == "no_message":
groups = [GroupName.ffdhe2048, GroupName.secp256r1,
GroupName.x25519]
elif conf['SKE_dh_group'] == "ffdhe1024":
groups = [GroupName.secp256r1, GroupName.x25519]
else:
groups = [getattr(GroupName, conf['SKE_dh_group'])]
ext[ExtensionType.supported_groups] = \
SupportedGroupsExtension().create(groups)
ext[ExtensionType.ec_point_formats] = \
ECPointFormatsExtension()\
.create([ECPointFormat.uncompressed,
ECPointFormat.ansiX962_compressed_char2,
ECPointFormat.ansiX962_compressed_prime])
# encrypt then MAC
if conf['CH_encrypt_then_mac_ext'] == "false":
pass
elif conf['CH_encrypt_then_mac_ext'] != "true":
raise ValueError("Unknown option in CH_encrypt_then_mac_ext: {0}"
.format(conf['CH_encrypt_then_mac_ext']))
else:
ext[ExtensionType.encrypt_then_mac] = \
TLSExtension(extType=ExtensionType.encrypt_then_mac)\
.create(bytearray(0))
# server name
if conf['CH_server_name'] == "no_ext":
pass
elif conf['CH_server_name'] == "correct":
ext[ExtensionType.server_name] = \
SNIExtension().create(sni_hostname)
elif conf['CH_server_name'] == "mismatch":
ext[ExtensionType.server_name] = \
SNIExtension().create(sni_hostname + b'.www')
else:
raise ValueError("Unknown option in CH_server_name: {0}"
.format(conf['CH_server_name']))
# OCSP staple
if conf['CH_status_request_ext'] == "false":
pass
elif conf['CH_status_request_ext'] != "true":
raise ValueError("Unknown option in CH_status_request_ext: {0}"
.format(conf['CH_status_request_ext']))
else:
ext[ExtensionType.status_request] = \
StatusRequestExtension().create()
# Extended Master Secret ext
if conf['CH_extended_master_secret_ext'] == "false":
pass
elif conf['CH_extended_master_secret_ext'] != "true":
raise ValueError(("Unknown value in CH_extended_master_secret_ext"
": {0}")
.format(conf['CH_extended_master_secret_ext']))
else:
ext[ExtensionType.extended_master_secret] = \
TLSExtension(extType=ExtensionType.extended_master_secret)\
.create(bytearray())
#
#
node = root.add_child(ClientHelloGenerator(suites,
session_id=sess_ID,
compression=compress,
extensions=ext))
if conf['CH_server_name'] == "mismatch":
node = node.add_child(ExpectAlert(AlertLevel.warning,
AlertDescription.unrecognized_name))
al_node = node
node = node.add_child(ExpectServerHello())
if conf['CH_server_name'] == "mismatch":
# make the sending of warning alert node optional
al_node.next_sibling = node
node = node.add_child(ExpectCertificate())
# TODO if conf['Certificate_Status_msg']
if conf['SKE_dh_group'] != "no_message":
node = node.add_child(ExpectServerKeyExchange())
if conf['CR_sent'] == "true":
node = node.add_child(ExpectCertificateRequest())
elif conf['CR_sent'] != "false":
raise ValueError("Unknown option in CR_sent: {0}"
.format(conf['CR_sent']))
node = node.add_child(ExpectServerHelloDone())
if conf['CR_sent'] == "true":
if conf['CV_signature_scheme'] == "no_message":
node = node.add_child(CertificateGenerator())
else:
node = node.add_child(CertificateGenerator(X509CertChain([cert])))
node = node.add_child(ClientKeyExchangeGenerator())
if conf['CV_signature_scheme'] != "no_message":
sig = conf['CV_signature_scheme']
if "dsa" in sig:
print("Changing {0} to RSA scheme in CV".format(sig))
sig = sig.replace("ecdsa", "rsa")
sig = sig.replace("dsa", "rsa")
sig = sig.replace("rsa_sha", "rsa_pkcs1_sha")
sig = sig.replace("rsapss", "rsa_pss")
if "sha224" in sig:
scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa)
else:
scheme = getattr(SignatureScheme, sig)
node = node.add_child(CertificateVerifyGenerator(key, msg_alg=scheme))
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(ExpectChangeCipherSpec())
node = node.add_child(ExpectFinished())
if conf['Disconnect'] == "true":
node = node.add_child(AlertGenerator(AlertLevel.warning,
AlertDescription.close_notify))
node = node.add_child(ExpectAlert(AlertLevel.warning,
AlertDescription.close_notify))
node.next_sibling = ExpectClose()
node = node.add_child(node.next_sibling)
node = node.add_child(Connect(host, port))
node = node.add_child(ResetRenegotiationInfo())
node = node.add_child(ResetHandshakeHashes())
hs = HandshakeSettings()
hs.cipherNames = [cipher_trans.get(conf['H2Cipher'], None)]
if hs.cipherNames == [None]:
raise ValueError("Unknown cipher type: {0}"
.format(conf['H2Cipher']))
hs.macNames = [mac_trans.get(conf["H2Integrity"], None)]
if hs.macNames == [None]:
raise ValueError("Unknown integrity type: {0}"
.format(conf['H2Integrity']))
if conf['H2Key_exchange'] == 'DHE' and \
conf['H2Server_authentication'] == "anon":
suites = CipherSuite.getAnonSuites(hs)
elif conf['H2Key_exchange'] == "ECDHE" and \
conf['H2Server_authentication'] == "anon":
suites = CipherSuite.getEcdhAnonSuites(hs)
elif conf['H2Key_exchange'] == "RSA":
suites = CipherSuite.getCertSuites(hs)
elif conf['H2Key_exchange'] == "DHE":
suites = CipherSuite.getDheCertSuites(hs)
elif conf['H2Key_exchange'] == "ECDHE":
suites = CipherSuite.getEcdheCertSuites(hs)
else:
raise ValueError("Unknown key exchange type: {0}"
.format(conf['H2Key_exchange']))
if not suites:
raise ValueError("Couldn't find matching cipher for {0} {3} {1} {2}"
.format(conf['H2Key_exchange'],
conf['H2Cipher'],
conf['H2Integrity'],
conf['H2Server_authentication']))
if conf['H2CH_SessionID'] == 'random':
sess_ID = getRandomBytes(16)
elif conf['H2CH_SessionID'] == 'empty':
sess_ID = bytearray()
elif conf['H2CH_SessionID'] == "resume":
sess_ID = None
else:
raise ValueError("Unknown session id value: {0}"
.format(conf['H2CH_SessionID']))
# compression
if conf['H2CH_compression'] == 'null_only':
compress = [0]
elif conf['H2CH_compression'] == 'null_and_deflate':
compress = [0, 1]
else:
raise ValueError("Unknown H2CH_compression value: {0}"
.format(conf['H2CH_compression']))
# Renegotiation Info
if conf['H2CH_renegotiation_info_SCSV'] == "first":
suites.insert(0, CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
elif conf['H2CH_renegotiation_info_SCSV'] == "last":
suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
elif conf['H2CH_renegotiation_info_SCSV'] == "absent":
pass
elif conf['H2CH_renegotiation_info_SCSV'] == "second":
suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
suites.append(0xeaea) # GREASE
else:
raise ValueError("Unexpected H2CH_renegotiation_info_SCSV value: {0}"
.format(conf['H2CH_renegotiation_info_SCSV']))
# whether to send extensions
if conf['H2CH_extensions_present'] == "false":
ext = None
elif conf['H2CH_extensions_present'] != "true":
raise ValueError("Unexpected CH_extensions_present value: {0}"
.format(conf['H2CH_extensions_present']))
else:
ext = dict()
# session ticket
if conf['H2CH_session_ticket'] != "no_ext":
print("Not generating session ticket extension", file=sys.stderr)
# renegotiation info
if conf['H2CH_renegotiation_info_ext'] == "true":
ext[ExtensionType.renegotiation_info] = None
elif conf['H2CH_renegotiation_info_ext'] == "false":
pass
else:
raise ValueError("Unknown option in H2CH_renegotiation_info_ext: "
"{0}"
.format(conf['H2CH_renegotiation_info_ext']))
# signature algorithms
if conf['H2CH_signature_algorithms_ext'] == "false":
pass
elif conf['H2CH_signature_algorithms_ext'] != "true":
raise ValueError("Unknown option H2CH_signature_algorithms_ext: "
"{0}"
.format(conf["H2CH_signature_algorithms_ext"]))
else:
sig = conf['H2SKE_signature_scheme']
if sig == "none" or sig == "no_message":
# enter some random ones:
ext[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension()\
.create([SignatureScheme.rsa_pkcs1_sha256,
SignatureScheme.rsa_pss_sha256])
else:
if "dsa" in sig:
print("Changing {0} to RSA scheme".format(sig))
sig = sig.replace("ecdsa", "rsa")
sig = sig.replace("dsa", "rsa")
sig = sig.replace("rsa_sha", "rsa_pkcs1_sha")
sig = sig.replace("rsapss", "rsa_pss")
if "sha224" in sig:
scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa)
else:
scheme = getattr(SignatureScheme, sig)
ext[ExtensionType.signature_algorithms] = \
SignatureAlgorithmsExtension()\
.create([scheme])
# supported groups extension
if conf['H2CH_supported_groups_ext'] == "false":
groups = [GroupName.ffdhe2048, GroupName.secp256r1,
GroupName.x25519]
ext[ExtensionType.supported_groups] = \
SupportedGroupsExtension().create(groups)
pass
elif conf['H2CH_supported_groups_ext'] != "true":
raise ValueError("Unknown option in H2CH_supported_groups_ext: {0}"
.format(conf['H2CH_supported_groups_ext']))
else:
if conf['H2SKE_dh_group'] == "no_message":
groups = [GroupName.ffdhe2048, GroupName.secp256r1,
GroupName.x25519]
elif conf['H2SKE_dh_group'] == "ffdhe1024":
groups = [GroupName.secp256r1, GroupName.x25519]
else:
groups = [getattr(GroupName, conf['H2SKE_dh_group'])]
ext[ExtensionType.supported_groups] = \
SupportedGroupsExtension().create(groups)
ext[ExtensionType.ec_point_formats] = \
ECPointFormatsExtension()\
.create([ECPointFormat.uncompressed,
ECPointFormat.ansiX962_compressed_char2,
ECPointFormat.ansiX962_compressed_prime])
# encrypt then MAC
if conf['H2CH_encrypt_then_mac_ext'] == "false":
pass
elif conf['H2CH_encrypt_then_mac_ext'] != "true":
raise ValueError("Unknown option in H2CH_encrypt_then_mac_ext: {0}"
.format(conf['H2CH_encrypt_then_mac_ext']))
else:
ext[ExtensionType.encrypt_then_mac] = \
TLSExtension(extType=ExtensionType.encrypt_then_mac)\
.create(bytearray(0))
# server name
if conf['H2CH_server_name'] == "no_ext":
pass
elif conf['H2CH_server_name'] == "correct":
ext[ExtensionType.server_name] = \
SNIExtension().create(sni_hostname)
elif conf['H2CH_server_name'] == "mismatch":
ext[ExtensionType.server_name] = \
SNIExtension().create(sni_hostname + b'.www')
else:
raise ValueError("Unknown option in H2CH_server_name: {0}"
.format(conf['H2CH_server_name']))
# OCSP staple
if conf['H2CH_status_request_ext'] == "false":
pass
elif conf['H2CH_status_request_ext'] != "true":
raise ValueError("Unknown option in H2CH_status_request_ext: {0}"
.format(conf['H2CH_status_request_ext']))
else:
ext[ExtensionType.status_request] = \
StatusRequestExtension().create()
# Extended Master Secret ext
if conf['H2CH_extended_master_secret_ext'] == "false":
pass
elif conf['H2CH_extended_master_secret_ext'] != "true":
raise ValueError(("Unknown value in H2CH_extended_master_secret_ext"
": {0}")
.format(conf['H2CH_extended_master_secret_ext']))
else:
ext[ExtensionType.extended_master_secret] = \
TLSExtension(extType=ExtensionType.extended_master_secret)\
.create(bytearray())
node = node.add_child(ClientHelloGenerator(suites,
session_id=sess_ID,
compression=compress,
extensions=ext))
if conf['H2CH_server_name'] == "mismatch":
node = node.add_child(ExpectAlert(AlertLevel.warning,
AlertDescription.unrecognized_name))
al_node = node
if conf['H2SH_SessionID'] == "resume":
print("doing resumption")
node = node.add_child(ExpectServerHello(resume=True))
if conf['H2CH_server_name'] == "mismatch":
# make the sending of warning alert node optional
al_node.next_sibling = node
node = node.add_child(ExpectChangeCipherSpec())
node = node.add_child(ExpectFinished())
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(AlertGenerator(AlertLevel.warning,
AlertDescription.close_notify))
node = node.add_child(ExpectAlert(AlertLevel.warning,
AlertDescription.close_notify))
node.next_sibling = ExpectClose()
else:
node = node.add_child(ExpectServerHello())
if conf['H2CH_server_name'] == "mismatch":
# make the sending of warning alert node optional
al_node.next_sibling = node
node = node.add_child(ExpectCertificate())
# TODO if conf['Certificate_Status_msg']
if conf['H2SKE_dh_group'] != "no_message":
node = node.add_child(ExpectServerKeyExchange())
if conf['H2CR_sent'] == "true":
node = node.add_child(ExpectCertificateRequest())
elif conf['H2CR_sent'] != "false":
raise ValueError("Unknown option in H2CR_sent: {0}"
.format(conf['H2CR_sent']))
node = node.add_child(ExpectServerHelloDone())
if conf['H2CR_sent'] == "true":
if conf['H2CV_signature_scheme'] == "no_message":
node = node.add_child(CertificateGenerator())
else:
node = node.add_child(CertificateGenerator(X509CertChain([cert])))
node = node.add_child(ClientKeyExchangeGenerator())
if conf['H2CV_signature_scheme'] != "no_message":
sig = conf['H2CV_signature_scheme']
if "dsa" in sig:
print("Changing {0} to RSA scheme in CV".format(sig))
sig = sig.replace("ecdsa", "rsa")
sig = sig.replace("dsa", "rsa")
sig = sig.replace("rsa_sha", "rsa_pkcs1_sha")
sig = sig.replace("rsapss", "rsa_pss")
if "sha224" in sig:
scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa)
else:
scheme = getattr(SignatureScheme, sig)
node = node.add_child(CertificateVerifyGenerator(key, msg_alg=scheme))
node = node.add_child(ChangeCipherSpecGenerator())
node = node.add_child(FinishedGenerator())
node = node.add_child(ExpectChangeCipherSpec())
node = node.add_child(ExpectFinished())
node = node.add_child(AlertGenerator(AlertLevel.warning,
AlertDescription.close_notify))
node = node.add_child(ExpectAlert(AlertLevel.warning,
AlertDescription.close_notify))
node.next_sibling = ExpectClose()
return root
def process_stdout(name, proc):
for line in iter(proc.stdout.readline, b''):
line = line.decode()
line = line.rstrip()
out.put("{0}:stdout:{1}".format(name, line))
def process_stderr(name, proc):
for line in iter(proc.stderr.readline, b''):
line = line.decode()
line = line.rstrip()
out.put("{0}:stderr:{1}".format(name, line))
def wait_till_open(host, port):
t1 = time.time()
while time.time() - t1 < 10:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
sock.connect((host, port))
except socket.error as e:
continue
break
else:
raise ValueError("Can't connect to server")
def flush_queue():
while True:
try:
line = out.get(False)
print(line)
#count_tc_passes(line)
except queue.Empty:
break
def start_server(server_cmd, server_env=tuple(), server_host=None,
server_port=4433):
if server_host is None:
server_host = "localhost"
my_env = os.environ.copy()
my_env.update(server_env)
ret = subprocess.Popen(server_cmd, env=my_env,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
bufsize=1)
thr_stdout = threading.Thread(target=process_stdout, args=('server', ret))
thr_stdout.daemon = True
thr_stdout.start()
thr_stderr = threading.Thread(target=process_stderr, args=('server', ret))
thr_stderr.daemon = True
thr_stderr.start()
try:
wait_till_open(server_host, server_port)
except ValueError:
flush_queue()
raise
return ret, thr_stdout, thr_stderr
def server_config(conf, cert_dir):
params = ["/usr/bin/openssl", "s_server", "-www"]
key = conf['Server_key']
if "ecdsa" in key:
key = "rsa2048"
key = key.replace("dsa", "rsa")
if key == "none":
params += ["-nocert"]
else:
params += ["-key", cert_dir + '/' + key + ".key",
"-cert", cert_dir + '/' + key + ".crt"]
if conf['CR_sent'] == "true":
params += ["-verify", "1"]
dh_param = conf['SKE_dh_group']
# XXX assuming that if second handshake is also using DHE group
# that it is the same group
if "ffdhe" in dh_param:
params += ["-dhparam", cert_dir + '/' + dh_param + ".pem"]
# only really needed for OpenSSL 1.0.1, later select curve automatically
#elif dh_param != "no_message":
# if dh_param != "x25519" and dh_param != "x448":
# dh_param = dh_param.replace("secp256r1", "prime256v1")
# params += ["-named_curve", dh_param]
if conf['SH_session_ticket_ext'] == "false":
params += ["-no_ticket"]
if conf['SH_compression'] == "deflate":
params += ["-comp"]
if conf['H2CH_SessionID'] == "resume" and \
conf['H2SH_SessionID'] != "resume":
params += ["-no_cache"]
# enable legacy renego
if conf['Disconnect'] == "false" and \
(conf['CH_renegotiation_info_SCSV'] == "absent" and \
conf['CH_renegotiation_info_ext'] == "false") or \
(conf['H2CH_renegotiation_info_SCSV'] == "absent" and \
conf['H2CH_renegotiation_info_ext'] == "false"):
params += ["-legacy_renegotiation"]
# TODO OCSP stapling
return params
argv = sys.argv[1:]
test_data = None
cert = None
key = None
run_only = set()
cert_dir = "."
opts, args = getopt.getopt(argv, "hi:k:c:D:")
for opt, arg in opts:
if opt == "-h":
help_msg()
os.exit(0)
elif opt == "-i":
test_data = arg
elif opt == "-k":
text_key = open(arg, 'rb').read()
if sys.version_info[0] >= 3:
text_key = str(text_key, 'utf-8')
key = parsePEMKey(text_key, private=True)
elif opt == "-c":
text_cert = open(arg, 'rb').read()
if sys.version_info[0] >= 3:
text_cert = str(text_cert, 'utf-8')
cert = X509()
cert.parse(text_cert)
elif opt == "-D":
cert_dir = arg
else:
raise ValueError("Unexpected option: {0}".format(opt))
if args:
for arg in args:
run_only.add(int(arg))
if test_data is None:
raise ValueError("No file specified")
with open(test_data) as f:
skip_comments(f)
reader = csv.DictReader(f)
good = 0
bad = 0
skip = 0
failed = []
skipped = []
for i, row in enumerate(reader):
if run_only and i not in run_only:
continue
print("Processing TC#: {0}".format(i))
conv = conv_generator(row, 'localhost', 4433, 'localhost', cert, key)
if conv is None:
skip += 1
skipped.append(i)
continue
print("starting server...")
try:
params = server_config(row, cert_dir)
srv, srv_out, srv_err = start_server(params, [], "localhost", 4433)
print("...")
runner = Runner(conv)
res = True
try:
runner.run()
except Exception:
print("Error while processing")
print(traceback.format_exc())
res = False
srv.send_signal(15)
srv.wait()
print("Server process killed: {0}".format(srv.returncode))
srv_out.join()
srv_err.join()
flush_queue()
if res:
good += 1
print("OK\n")
else:
bad += 1
failed.append(i)
print(sorted(row.items()))
finally:
flush_queue()
print()
print("Test end")
print("successful: {0}".format(good))
print("skipped: {0}".format(skip))
print(" {0}".format("\n ".join(str(i) for i in skipped)))
print("failed: {0}".format(bad))
print(" {0}".format("\n ".join(str(i) for i in failed)))

Computing file changes ...