from __future__ import print_function import os import csv import sys import time import getopt import socket import threading import traceback import subprocess try: import queue except ImportError: import Queue as queue from tlslite.handshakesettings import HandshakeSettings from tlslite.constants import CipherSuite, ExtensionType, SignatureScheme, \ SignatureAlgorithm, HashAlgorithm, AlertDescription, AlertLevel, \ GroupName, ECPointFormat from tlslite.extensions import TLSExtension, SNIExtension, \ RenegotiationInfoExtension, SignatureAlgorithmsExtension, \ SupportedGroupsExtension, StatusRequestExtension, \ ECPointFormatsExtension from tlslite.utils.cryptomath import getRandomBytes from tlslite.utils.keyfactory import parsePEMKey from tlslite.x509 import X509 from tlslite.x509certchain import X509CertChain from tlsfuzzer.messages import Connect, ClientHelloGenerator, \ CertificateGenerator, CertificateVerifyGenerator, \ ClientKeyExchangeGenerator, ChangeCipherSpecGenerator, \ FinishedGenerator, AlertGenerator, ResetRenegotiationInfo, \ ResetHandshakeHashes from tlsfuzzer.expect import ExpectServerHello, ExpectCertificate, \ ExpectServerHelloDone, ExpectServerKeyExchange, \ ExpectCertificateRequest, ExpectChangeCipherSpec, ExpectFinished, \ ExpectAlert, ExpectClose from tlsfuzzer.runner import Runner out = queue.Queue() def help_msg(): print("Usage -i definitions.csv") print("-i file name of the CSV file generated by ACTS with") print(" definitions of all the test cases to run") def skip_comments(f): """Skip the comments (lines starting with '#') in CSV file.""" while True: pos = f.tell() r = f.read(1) if r == b'#': f.readline() continue else: f.seek(pos) break def conv_generator(conf, host, port, sni_hostname, cert=None, key=None): """Generate a conversation based on dict with configuration.""" root = Connect(host, port) hs = HandshakeSettings() # only RSA is supported if conf['Server_authentication'] != "RSA" and \ conf['Server_authentication'] != "anon": print("Substituting {0} to RSA for server auth" .format(conf['Server_authentication']), file=sys.stderr) # get the cipher that matches the imposed restrictions cipher_trans = {"AES_128_CBC": "aes128", "AES_256_CBC": "aes256", "AES_128_GCM": "aes128gcm", "AES_256_GCM": "aes256gcm", "3DES_EDE_CBC": "3des", "RC4": "rc4", "Chacha20_Poly1305": "chacha20-poly1305"} hs.cipherNames = [cipher_trans.get(conf['Cipher'], None)] if hs.cipherNames == [None]: raise ValueError("Unknown cipher type: {0}".format(conf['Cipher'])) mac_trans = {"AEAD": "aead", "MD5_HMAC": "md5", "SHA1_HMAC": "sha", "SHA256_HMAC": "sha256", "SHA384_HMAC": "sha384"} hs.macNames = [mac_trans.get(conf['Integrity'], None)] if hs.macNames == [None]: raise ValueError("Unknown integrity type: {0}" .format(conf['Integrity'])) if conf['Key_exchange'] == 'DHE' and \ conf['Server_authentication'] == "anon": suites = CipherSuite.getAnonSuites(hs) elif conf['Key_exchange'] == 'ECDHE' and \ conf['Server_authentication'] == "anon": suites = CipherSuite.getEcdhAnonSuites(hs) elif conf['Key_exchange'] == 'RSA': suites = CipherSuite.getCertSuites(hs) elif conf['Key_exchange'] == 'DHE': suites = CipherSuite.getDheCertSuites(hs) elif conf['Key_exchange'] == 'ECDHE': suites = CipherSuite.getEcdheCertSuites(hs) else: raise ValueError("Unknown key exchange type: {0}" .format(conf['Key_exchange'])) if not suites: raise ValueError("Couldn't find matching cipher for {0} {3} {1} {2}" .format(conf['Key_exchange'], conf['Cipher'], conf['Integrity'], conf['Server_authentication'])) # session ID/resumption handling if conf['CH_SessionID'] == 'random': sess_ID = getRandomBytes(16) elif conf['CH_SessionID'] == 'empty': sess_ID = bytearray() else: raise ValueError("Unknown CH_SessionID value" .format(conf['CH_SessionID'])) # compression if conf['CH_compression'] == 'null_only': compress = [0] elif conf['CH_compression'] == 'null_and_deflate': compress = [0, 1] else: raise ValueError("Unknown CH_compression value: {0}" .format(conf['CH_compression'])) # Renegotiation Info if conf['CH_renegotiation_info_SCSV'] == "first": suites.insert(0, CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV) elif conf['CH_renegotiation_info_SCSV'] == "last": suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV) elif conf['CH_renegotiation_info_SCSV'] == "absent": pass elif conf['CH_renegotiation_info_SCSV'] == "second": suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV) suites.append(0xeaea) # GREASE else: raise ValueError("Unexpected CH_renegotiation_info_SCSV value: {0}" .format(conf['CH_renegotiation_info_SCSV'])) # whether to send extensions if conf['CH_extensions_present'] == "false": ext = None elif conf['CH_extensions_present'] != "true": raise ValueError("Unexpected CH_extensions_present value: {0}" .format(conf['CH_extensions_present'])) else: ext = dict() # session ticket if conf['CH_session_ticket'] != "no_ext": print("Not generating session ticket extension", file=sys.stderr) # renegotiation info if conf['CH_renegotiation_info_ext'] == "true": ext[ExtensionType.renegotiation_info] = \ RenegotiationInfoExtension().create(bytearray()) elif conf['CH_renegotiation_info_ext'] == "false": pass else: raise ValueError("Unknown option in CH_renegotiation_info_ext: {0}" .format(conf['CH_renegotiation_info_ext'])) # signature algorithms if conf['CH_signature_algorithms_ext'] == "false": pass elif conf['CH_signature_algorithms_ext'] != "true": raise ValueError("Unknown option CH_signature_algorithms_ext: {0}" .format(conf["CH_signature_algorithms_ext"])) else: sig = conf['SKE_signature_scheme'] if sig == "none" or sig == "no_message": # enter some random ones: ext[ExtensionType.signature_algorithms] = \ SignatureAlgorithmsExtension()\ .create([SignatureScheme.rsa_pkcs1_sha256, SignatureScheme.rsa_pss_sha256]) else: if "dsa" in sig: print("Changing {0} to RSA scheme".format(sig)) sig = sig.replace("ecdsa", "rsa") sig = sig.replace("dsa", "rsa") sig = sig.replace("rsa_sha", "rsa_pkcs1_sha") sig = sig.replace("rsapss", "rsa_pss") if "sha224" in sig: scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa) else: scheme = getattr(SignatureScheme, sig) ext[ExtensionType.signature_algorithms] = \ SignatureAlgorithmsExtension()\ .create([scheme]) # supported groups extension if conf['CH_supported_groups_ext'] == "false": groups = [GroupName.ffdhe2048, GroupName.secp256r1, GroupName.x25519] ext[ExtensionType.supported_groups] = \ SupportedGroupsExtension().create(groups) pass elif conf['CH_supported_groups_ext'] != "true": raise ValueError("Unknown option in CH_supported_groups_ext: {0}" .format(conf['CH_supported_groups_ext'])) else: if conf['SKE_dh_group'] == "no_message": groups = [GroupName.ffdhe2048, GroupName.secp256r1, GroupName.x25519] elif conf['SKE_dh_group'] == "ffdhe1024": groups = [GroupName.secp256r1, GroupName.x25519] else: groups = [getattr(GroupName, conf['SKE_dh_group'])] ext[ExtensionType.supported_groups] = \ SupportedGroupsExtension().create(groups) ext[ExtensionType.ec_point_formats] = \ ECPointFormatsExtension()\ .create([ECPointFormat.uncompressed, ECPointFormat.ansiX962_compressed_char2, ECPointFormat.ansiX962_compressed_prime]) # encrypt then MAC if conf['CH_encrypt_then_mac_ext'] == "false": pass elif conf['CH_encrypt_then_mac_ext'] != "true": raise ValueError("Unknown option in CH_encrypt_then_mac_ext: {0}" .format(conf['CH_encrypt_then_mac_ext'])) else: ext[ExtensionType.encrypt_then_mac] = \ TLSExtension(extType=ExtensionType.encrypt_then_mac)\ .create(bytearray(0)) # server name if conf['CH_server_name'] == "no_ext": pass elif conf['CH_server_name'] == "correct": ext[ExtensionType.server_name] = \ SNIExtension().create(sni_hostname) elif conf['CH_server_name'] == "mismatch": ext[ExtensionType.server_name] = \ SNIExtension().create(sni_hostname + b'.www') else: raise ValueError("Unknown option in CH_server_name: {0}" .format(conf['CH_server_name'])) # OCSP staple if conf['CH_status_request_ext'] == "false": pass elif conf['CH_status_request_ext'] != "true": raise ValueError("Unknown option in CH_status_request_ext: {0}" .format(conf['CH_status_request_ext'])) else: ext[ExtensionType.status_request] = \ StatusRequestExtension().create() # Extended Master Secret ext if conf['CH_extended_master_secret_ext'] == "false": pass elif conf['CH_extended_master_secret_ext'] != "true": raise ValueError(("Unknown value in CH_extended_master_secret_ext" ": {0}") .format(conf['CH_extended_master_secret_ext'])) else: ext[ExtensionType.extended_master_secret] = \ TLSExtension(extType=ExtensionType.extended_master_secret)\ .create(bytearray()) # # node = root.add_child(ClientHelloGenerator(suites, session_id=sess_ID, compression=compress, extensions=ext)) if conf['CH_server_name'] == "mismatch": node = node.add_child(ExpectAlert(AlertLevel.warning, AlertDescription.unrecognized_name)) al_node = node node = node.add_child(ExpectServerHello()) if conf['CH_server_name'] == "mismatch": # make the sending of warning alert node optional al_node.next_sibling = node node = node.add_child(ExpectCertificate()) # TODO if conf['Certificate_Status_msg'] if conf['SKE_dh_group'] != "no_message": node = node.add_child(ExpectServerKeyExchange()) if conf['CR_sent'] == "true": node = node.add_child(ExpectCertificateRequest()) elif conf['CR_sent'] != "false": raise ValueError("Unknown option in CR_sent: {0}" .format(conf['CR_sent'])) node = node.add_child(ExpectServerHelloDone()) if conf['CR_sent'] == "true": if conf['CV_signature_scheme'] == "no_message": node = node.add_child(CertificateGenerator()) else: node = node.add_child(CertificateGenerator(X509CertChain([cert]))) node = node.add_child(ClientKeyExchangeGenerator()) if conf['CV_signature_scheme'] != "no_message": sig = conf['CV_signature_scheme'] if "dsa" in sig: print("Changing {0} to RSA scheme in CV".format(sig)) sig = sig.replace("ecdsa", "rsa") sig = sig.replace("dsa", "rsa") sig = sig.replace("rsa_sha", "rsa_pkcs1_sha") sig = sig.replace("rsapss", "rsa_pss") if "sha224" in sig: scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa) else: scheme = getattr(SignatureScheme, sig) node = node.add_child(CertificateVerifyGenerator(key, msg_alg=scheme)) node = node.add_child(ChangeCipherSpecGenerator()) node = node.add_child(FinishedGenerator()) node = node.add_child(ExpectChangeCipherSpec()) node = node.add_child(ExpectFinished()) if conf['Disconnect'] == "true": node = node.add_child(AlertGenerator(AlertLevel.warning, AlertDescription.close_notify)) node = node.add_child(ExpectAlert(AlertLevel.warning, AlertDescription.close_notify)) node.next_sibling = ExpectClose() node = node.add_child(node.next_sibling) node = node.add_child(Connect(host, port)) node = node.add_child(ResetRenegotiationInfo()) node = node.add_child(ResetHandshakeHashes()) hs = HandshakeSettings() hs.cipherNames = [cipher_trans.get(conf['H2Cipher'], None)] if hs.cipherNames == [None]: raise ValueError("Unknown cipher type: {0}" .format(conf['H2Cipher'])) hs.macNames = [mac_trans.get(conf["H2Integrity"], None)] if hs.macNames == [None]: raise ValueError("Unknown integrity type: {0}" .format(conf['H2Integrity'])) if conf['H2Key_exchange'] == 'DHE' and \ conf['H2Server_authentication'] == "anon": suites = CipherSuite.getAnonSuites(hs) elif conf['H2Key_exchange'] == "ECDHE" and \ conf['H2Server_authentication'] == "anon": suites = CipherSuite.getEcdhAnonSuites(hs) elif conf['H2Key_exchange'] == "RSA": suites = CipherSuite.getCertSuites(hs) elif conf['H2Key_exchange'] == "DHE": suites = CipherSuite.getDheCertSuites(hs) elif conf['H2Key_exchange'] == "ECDHE": suites = CipherSuite.getEcdheCertSuites(hs) else: raise ValueError("Unknown key exchange type: {0}" .format(conf['H2Key_exchange'])) if not suites: raise ValueError("Couldn't find matching cipher for {0} {3} {1} {2}" .format(conf['H2Key_exchange'], conf['H2Cipher'], conf['H2Integrity'], conf['H2Server_authentication'])) if conf['H2CH_SessionID'] == 'random': sess_ID = getRandomBytes(16) elif conf['H2CH_SessionID'] == 'empty': sess_ID = bytearray() elif conf['H2CH_SessionID'] == "resume": sess_ID = None else: raise ValueError("Unknown session id value: {0}" .format(conf['H2CH_SessionID'])) # compression if conf['H2CH_compression'] == 'null_only': compress = [0] elif conf['H2CH_compression'] == 'null_and_deflate': compress = [0, 1] else: raise ValueError("Unknown H2CH_compression value: {0}" .format(conf['H2CH_compression'])) # Renegotiation Info if conf['H2CH_renegotiation_info_SCSV'] == "first": suites.insert(0, CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV) elif conf['H2CH_renegotiation_info_SCSV'] == "last": suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV) elif conf['H2CH_renegotiation_info_SCSV'] == "absent": pass elif conf['H2CH_renegotiation_info_SCSV'] == "second": suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV) suites.append(0xeaea) # GREASE else: raise ValueError("Unexpected H2CH_renegotiation_info_SCSV value: {0}" .format(conf['H2CH_renegotiation_info_SCSV'])) # whether to send extensions if conf['H2CH_extensions_present'] == "false": ext = None elif conf['H2CH_extensions_present'] != "true": raise ValueError("Unexpected CH_extensions_present value: {0}" .format(conf['H2CH_extensions_present'])) else: ext = dict() # session ticket if conf['H2CH_session_ticket'] != "no_ext": print("Not generating session ticket extension", file=sys.stderr) # renegotiation info if conf['H2CH_renegotiation_info_ext'] == "true": ext[ExtensionType.renegotiation_info] = None elif conf['H2CH_renegotiation_info_ext'] == "false": pass else: raise ValueError("Unknown option in H2CH_renegotiation_info_ext: " "{0}" .format(conf['H2CH_renegotiation_info_ext'])) # signature algorithms if conf['H2CH_signature_algorithms_ext'] == "false": pass elif conf['H2CH_signature_algorithms_ext'] != "true": raise ValueError("Unknown option H2CH_signature_algorithms_ext: " "{0}" .format(conf["H2CH_signature_algorithms_ext"])) else: sig = conf['H2SKE_signature_scheme'] if sig == "none" or sig == "no_message": # enter some random ones: ext[ExtensionType.signature_algorithms] = \ SignatureAlgorithmsExtension()\ .create([SignatureScheme.rsa_pkcs1_sha256, SignatureScheme.rsa_pss_sha256]) else: if "dsa" in sig: print("Changing {0} to RSA scheme".format(sig)) sig = sig.replace("ecdsa", "rsa") sig = sig.replace("dsa", "rsa") sig = sig.replace("rsa_sha", "rsa_pkcs1_sha") sig = sig.replace("rsapss", "rsa_pss") if "sha224" in sig: scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa) else: scheme = getattr(SignatureScheme, sig) ext[ExtensionType.signature_algorithms] = \ SignatureAlgorithmsExtension()\ .create([scheme]) # supported groups extension if conf['H2CH_supported_groups_ext'] == "false": groups = [GroupName.ffdhe2048, GroupName.secp256r1, GroupName.x25519] ext[ExtensionType.supported_groups] = \ SupportedGroupsExtension().create(groups) pass elif conf['H2CH_supported_groups_ext'] != "true": raise ValueError("Unknown option in H2CH_supported_groups_ext: {0}" .format(conf['H2CH_supported_groups_ext'])) else: if conf['H2SKE_dh_group'] == "no_message": groups = [GroupName.ffdhe2048, GroupName.secp256r1, GroupName.x25519] elif conf['H2SKE_dh_group'] == "ffdhe1024": groups = [GroupName.secp256r1, GroupName.x25519] else: groups = [getattr(GroupName, conf['H2SKE_dh_group'])] ext[ExtensionType.supported_groups] = \ SupportedGroupsExtension().create(groups) ext[ExtensionType.ec_point_formats] = \ ECPointFormatsExtension()\ .create([ECPointFormat.uncompressed, ECPointFormat.ansiX962_compressed_char2, ECPointFormat.ansiX962_compressed_prime]) # encrypt then MAC if conf['H2CH_encrypt_then_mac_ext'] == "false": pass elif conf['H2CH_encrypt_then_mac_ext'] != "true": raise ValueError("Unknown option in H2CH_encrypt_then_mac_ext: {0}" .format(conf['H2CH_encrypt_then_mac_ext'])) else: ext[ExtensionType.encrypt_then_mac] = \ TLSExtension(extType=ExtensionType.encrypt_then_mac)\ .create(bytearray(0)) # server name if conf['H2CH_server_name'] == "no_ext": pass elif conf['H2CH_server_name'] == "correct": ext[ExtensionType.server_name] = \ SNIExtension().create(sni_hostname) elif conf['H2CH_server_name'] == "mismatch": ext[ExtensionType.server_name] = \ SNIExtension().create(sni_hostname + b'.www') else: raise ValueError("Unknown option in H2CH_server_name: {0}" .format(conf['H2CH_server_name'])) # OCSP staple if conf['H2CH_status_request_ext'] == "false": pass elif conf['H2CH_status_request_ext'] != "true": raise ValueError("Unknown option in H2CH_status_request_ext: {0}" .format(conf['H2CH_status_request_ext'])) else: ext[ExtensionType.status_request] = \ StatusRequestExtension().create() # Extended Master Secret ext if conf['H2CH_extended_master_secret_ext'] == "false": pass elif conf['H2CH_extended_master_secret_ext'] != "true": raise ValueError(("Unknown value in H2CH_extended_master_secret_ext" ": {0}") .format(conf['H2CH_extended_master_secret_ext'])) else: ext[ExtensionType.extended_master_secret] = \ TLSExtension(extType=ExtensionType.extended_master_secret)\ .create(bytearray()) node = node.add_child(ClientHelloGenerator(suites, session_id=sess_ID, compression=compress, extensions=ext)) if conf['H2CH_server_name'] == "mismatch": node = node.add_child(ExpectAlert(AlertLevel.warning, AlertDescription.unrecognized_name)) al_node = node if conf['H2SH_SessionID'] == "resume": print("doing resumption") node = node.add_child(ExpectServerHello(resume=True)) if conf['H2CH_server_name'] == "mismatch": # make the sending of warning alert node optional al_node.next_sibling = node node = node.add_child(ExpectChangeCipherSpec()) node = node.add_child(ExpectFinished()) node = node.add_child(ChangeCipherSpecGenerator()) node = node.add_child(FinishedGenerator()) node = node.add_child(AlertGenerator(AlertLevel.warning, AlertDescription.close_notify)) node = node.add_child(ExpectAlert(AlertLevel.warning, AlertDescription.close_notify)) node.next_sibling = ExpectClose() else: node = node.add_child(ExpectServerHello()) if conf['H2CH_server_name'] == "mismatch": # make the sending of warning alert node optional al_node.next_sibling = node node = node.add_child(ExpectCertificate()) # TODO if conf['Certificate_Status_msg'] if conf['H2SKE_dh_group'] != "no_message": node = node.add_child(ExpectServerKeyExchange()) if conf['H2CR_sent'] == "true": node = node.add_child(ExpectCertificateRequest()) elif conf['H2CR_sent'] != "false": raise ValueError("Unknown option in H2CR_sent: {0}" .format(conf['H2CR_sent'])) node = node.add_child(ExpectServerHelloDone()) if conf['H2CR_sent'] == "true": if conf['H2CV_signature_scheme'] == "no_message": node = node.add_child(CertificateGenerator()) else: node = node.add_child(CertificateGenerator(X509CertChain([cert]))) node = node.add_child(ClientKeyExchangeGenerator()) if conf['H2CV_signature_scheme'] != "no_message": sig = conf['H2CV_signature_scheme'] if "dsa" in sig: print("Changing {0} to RSA scheme in CV".format(sig)) sig = sig.replace("ecdsa", "rsa") sig = sig.replace("dsa", "rsa") sig = sig.replace("rsa_sha", "rsa_pkcs1_sha") sig = sig.replace("rsapss", "rsa_pss") if "sha224" in sig: scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa) else: scheme = getattr(SignatureScheme, sig) node = node.add_child(CertificateVerifyGenerator(key, msg_alg=scheme)) node = node.add_child(ChangeCipherSpecGenerator()) node = node.add_child(FinishedGenerator()) node = node.add_child(ExpectChangeCipherSpec()) node = node.add_child(ExpectFinished()) node = node.add_child(AlertGenerator(AlertLevel.warning, AlertDescription.close_notify)) node = node.add_child(ExpectAlert(AlertLevel.warning, AlertDescription.close_notify)) node.next_sibling = ExpectClose() return root def process_stdout(name, proc): for line in iter(proc.stdout.readline, b''): line = line.decode() line = line.rstrip() out.put("{0}:stdout:{1}".format(name, line)) def process_stderr(name, proc): for line in iter(proc.stderr.readline, b''): line = line.decode() line = line.rstrip() out.put("{0}:stderr:{1}".format(name, line)) def wait_till_open(host, port): t1 = time.time() while time.time() - t1 < 10: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) try: sock.connect((host, port)) except socket.error as e: continue break else: raise ValueError("Can't connect to server") def flush_queue(): while True: try: line = out.get(False) print(line) #count_tc_passes(line) except queue.Empty: break def start_server(server_cmd, server_env=tuple(), server_host=None, server_port=4433): if server_host is None: server_host = "localhost" my_env = os.environ.copy() my_env.update(server_env) ret = subprocess.Popen(server_cmd, env=my_env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1) thr_stdout = threading.Thread(target=process_stdout, args=('server', ret)) thr_stdout.daemon = True thr_stdout.start() thr_stderr = threading.Thread(target=process_stderr, args=('server', ret)) thr_stderr.daemon = True thr_stderr.start() try: wait_till_open(server_host, server_port) except ValueError: flush_queue() raise return ret, thr_stdout, thr_stderr def server_config(conf, cert_dir): params = ["/usr/bin/openssl", "s_server", "-www"] key = conf['Server_key'] if "ecdsa" in key: key = "rsa2048" key = key.replace("dsa", "rsa") if key == "none": params += ["-nocert"] else: params += ["-key", cert_dir + '/' + key + ".key", "-cert", cert_dir + '/' + key + ".crt"] if conf['CR_sent'] == "true": params += ["-verify", "1"] dh_param = conf['SKE_dh_group'] # XXX assuming that if second handshake is also using DHE group # that it is the same group if "ffdhe" in dh_param: params += ["-dhparam", cert_dir + '/' + dh_param + ".pem"] # only really needed for OpenSSL 1.0.1, later select curve automatically #elif dh_param != "no_message": # if dh_param != "x25519" and dh_param != "x448": # dh_param = dh_param.replace("secp256r1", "prime256v1") # params += ["-named_curve", dh_param] if conf['SH_session_ticket_ext'] == "false": params += ["-no_ticket"] if conf['SH_compression'] == "deflate": params += ["-comp"] if conf['H2CH_SessionID'] == "resume" and \ conf['H2SH_SessionID'] != "resume": params += ["-no_cache"] # enable legacy renego if conf['Disconnect'] == "false" and \ (conf['CH_renegotiation_info_SCSV'] == "absent" and \ conf['CH_renegotiation_info_ext'] == "false") or \ (conf['H2CH_renegotiation_info_SCSV'] == "absent" and \ conf['H2CH_renegotiation_info_ext'] == "false"): params += ["-legacy_renegotiation"] # TODO OCSP stapling return params argv = sys.argv[1:] test_data = None cert = None key = None run_only = set() cert_dir = "." opts, args = getopt.getopt(argv, "hi:k:c:D:") for opt, arg in opts: if opt == "-h": help_msg() os.exit(0) elif opt == "-i": test_data = arg elif opt == "-k": text_key = open(arg, 'rb').read() if sys.version_info[0] >= 3: text_key = str(text_key, 'utf-8') key = parsePEMKey(text_key, private=True) elif opt == "-c": text_cert = open(arg, 'rb').read() if sys.version_info[0] >= 3: text_cert = str(text_cert, 'utf-8') cert = X509() cert.parse(text_cert) elif opt == "-D": cert_dir = arg else: raise ValueError("Unexpected option: {0}".format(opt)) if args: for arg in args: run_only.add(int(arg)) if test_data is None: raise ValueError("No file specified") with open(test_data) as f: skip_comments(f) reader = csv.DictReader(f) good = 0 bad = 0 skip = 0 failed = [] skipped = [] for i, row in enumerate(reader): if run_only and i not in run_only: continue print("Processing TC#: {0}".format(i)) conv = conv_generator(row, 'localhost', 4433, 'localhost', cert, key) if conv is None: skip += 1 skipped.append(i) continue print("starting server...") try: params = server_config(row, cert_dir) srv, srv_out, srv_err = start_server(params, [], "localhost", 4433) print("...") runner = Runner(conv) res = True try: runner.run() except Exception: print("Error while processing") print(traceback.format_exc()) res = False srv.send_signal(15) srv.wait() print("Server process killed: {0}".format(srv.returncode)) srv_out.join() srv_err.join() flush_queue() if res: good += 1 print("OK\n") else: bad += 1 failed.append(i) print(sorted(row.items())) finally: flush_queue() print() print("Test end") print("successful: {0}".format(good)) print("skipped: {0}".format(skip)) print(" {0}".format("\n ".join(str(i) for i in skipped))) print("failed: {0}".format(bad)) print(" {0}".format("\n ".join(str(i) for i in failed)))