Skip to main content
  • Home
  • login
  • Browse the archive

    swh mirror partner logo
swh logo
SoftwareHeritage
Software
Heritage
Mirror
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

Revision 771e9f59d639dbb0e2fa8e646c8e588405d3903e authored by Hubert Kario on 09 September 2021, 19:24:48 UTC, committed by GitHub on 09 September 2021, 19:24:48 UTC
Merge pull request #474 from tlsfuzzer/ubuntu18.04
use newer ubuntu for old pythons
2 parent s 7b7a811 + b3e9049
  • Files
  • Changes
  • 3477098
  • /
  • scripts
  • /
  • tls.py
Raw File
Cook and download a directory from the Software Heritage Vault

You have requested the cooking of the directory with identifier None into a standard tar.gz archive.

Are you sure you want to continue ?

Download a directory from the Software Heritage Vault

You have requested the download of the directory with identifier None as a standard tar.gz archive.

Are you sure you want to continue ?

Cook and download a revision from the Software Heritage Vault

You have requested the cooking of the history heading to revision with identifier swh:1:rev:771e9f59d639dbb0e2fa8e646c8e588405d3903e into a bare git archive.

Are you sure you want to continue ?

Download a revision from the Software Heritage Vault

You have requested the download of the history heading to revision with identifier swh:1:rev:771e9f59d639dbb0e2fa8e646c8e588405d3903e as a bare git archive.

Are you sure you want to continue ?

Invalid Email !

The provided email is not well-formed.

Download link has expired

The requested archive is no longer available for download from the Software Heritage Vault.

Do you want to cook it again ?

Permalinks

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • revision
  • content
revision badge
swh:1:rev:771e9f59d639dbb0e2fa8e646c8e588405d3903e
content badge Iframe embedding
swh:1:cnt:a1994fc9426943d0d6a5664bfb890fc45ed2811a
tls.py
#!/usr/bin/env python

# Authors: 
#   Trevor Perrin
#   Marcelo Fernandez - bugfix and NPN support
#   Martin von Loewis - python 3 port
#
# See the LICENSE file for legal information regarding use of this file.
from __future__ import print_function
import sys
import os
import os.path
import socket
import struct
import getopt
import binascii
try:
    import httplib
    from SocketServer import *
    from BaseHTTPServer import *
    from SimpleHTTPServer import *
except ImportError:
    # Python 3.x
    from http import client as httplib
    from socketserver import *
    from http.server import *
    from http.server import SimpleHTTPRequestHandler

if __name__ != "__main__":
    raise Exception("This must be run as a command, not used as a module!")

from tlslite.api import *
from tlslite.constants import CipherSuite, HashAlgorithm, SignatureAlgorithm, \
        GroupName, SignatureScheme
from tlslite.handshakesettings import Keypair, VirtualHost
from tlslite import __version__
from tlslite.utils.compat import b2a_hex, a2b_hex, time_stamp
from tlslite.utils.dns_utils import is_valid_hostname
from tlslite.utils.cryptomath import getRandomBytes
from tlslite.constants import KeyUpdateMessageType

try:
    from tack.structures.Tack import Tack

except ImportError:
    pass

def printUsage(s=None):
    if s:
        print("ERROR: %s" % s)

    print("")
    print("Version: %s" % __version__)
    print("")
    print("RNG: %s" % prngName)
    print("")
    print("Modules:")
    if tackpyLoaded:
        print("  tackpy      : Loaded")
    else:
        print("  tackpy      : Not Loaded")            
    if m2cryptoLoaded:
        print("  M2Crypto    : Loaded")
    else:
        print("  M2Crypto    : Not Loaded")
    if pycryptoLoaded:
        print("  pycrypto    : Loaded")
    else:
        print("  pycrypto    : Not Loaded")
    if gmpyLoaded:
        print("  GMPY        : Loaded")
    else:
        print("  GMPY        : Not Loaded")
    if GMPY2_LOADED:
        print("  GMPY2       : Loaded")
    else:
        print("  GMPY2       : Not Loaded")

    print("")
    print("""Commands:

  server  
    [-c CERT] [-k KEY] [-t TACK] [-v VERIFIERDB] [-d DIR] [-l LABEL] [-L LENGTH]
    [--reqcert] [--param DHFILE] [--psk PSK] [--psk-ident IDENTITY]
    [--psk-sha384] [--ssl3] [--max-ver VER] [--tickets COUNT] [--cipherlist]
    [--request-pha] [--require-pha]
    HOST:PORT

  client
    [-c CERT] [-k KEY] [-u USER] [-p PASS] [-l LABEL] [-L LENGTH] [-a ALPN]
    [--psk PSK] [--psk-ident IDENTITY] [--psk-sha384] [--resumption] [--ssl3]
    [--max-ver VER] [--cipherlist]
    HOST:PORT

  LABEL - TLS exporter label
  LENGTH - amount of info to export using TLS exporter
  ALPN - name of protocol for ALPN negotiation, can be present multiple times
         in client to specify multiple protocols supported
  DHFILE - file that includes Diffie-Hellman parameters to be used with DHE
           key exchange
  PSK - hex encoded (without starting 0x) shared key to be used for connection
  IDENTITY - name associated with the PSK key
  --ssl3 - enable support for SSLv3
  VER - TLS version as a string, "ssl3", "tls1.0", "tls1.1", "tls1.2" or
        "tls1.3"
  --tickets COUNT - how many tickets should server send after handshake is
                    finished
  --cipherlist - comma separated ciphers to enable. For ex. aes128ccm,3des
                 You can specify this option multiple times.
  --request-pha - ask client for post-handshake authentication
  --require-pha - abort connection if client didn't provide certificate in
                  post-handshake authentication
  CERT, KEY - the file with key and certificates that will be used by client or
        server. The server can accept multiple pairs of `-c` and `-k` options
        to configure different certificates (like RSA and ECDSA)

""")
    sys.exit(-1)


def ver_to_tuple(name):
    vers = {"ssl3": (3, 0),
            "tls1.0": (3, 1),
            "tls1.1": (3, 2),
            "tls1.2": (3, 3),
            "tls1.3": (3, 4)}
    try:
        return vers[name]
    except KeyError:
        raise ValueError("Unknown protocol name: {0}".format(name))


def printError(s):
    """Print error message and exit"""
    sys.stderr.write("ERROR: %s\n" % s)
    sys.exit(-1)


def handleArgs(argv, argString, flagsList=[]):
    # Convert to getopt argstring format:
    # Add ":" after each arg, ie "abc" -> "a:b:c:"
    getOptArgString = ":".join(argString) + ":"
    try:
        opts, argv = getopt.getopt(argv, getOptArgString, flagsList)
    except getopt.GetoptError as e:
        printError(e) 
    # Default values if arg not present  
    privateKey = None
    cert_chain = None
    virtual_hosts = []
    v_host_cert = None
    username = None
    password = None
    tacks = None
    verifierDB = None
    reqCert = False
    directory = None
    expLabel = None
    expLength = 20
    alpn = []
    dhparam = None
    psk = None
    psk_ident = None
    psk_hash = 'sha256'
    resumption = False
    ssl3 = False
    max_ver = None
    tickets = None
    ciphers = []
    request_pha = False
    require_pha = False

    for opt, arg in opts:
        if opt == "-k":
            s = open(arg, "rb").read()
            if sys.version_info[0] >= 3:
                s = str(s, 'utf-8')
            # OpenSSL/m2crypto does not support RSASSA-PSS certificates
            if not privateKey:
                privateKey = parsePEMKey(s, private=True,
                                         implementations=["python"])
            else:
                if not v_host_cert:
                    raise ValueError("Virtual host certificate missing "
                                     "(must be listed before key)")
                p_key = parsePEMKey(s, private=True,
                                    implementations=["python"])
                if not virtual_hosts:
                    virtual_hosts.append(VirtualHost())
                virtual_hosts[0].keys.append(
                    Keypair(p_key, v_host_cert.x509List))
                v_host_cert = None
        elif opt == "-c":
            s = open(arg, "rb").read()
            if sys.version_info[0] >= 3:
                s = str(s, 'utf-8')
            if not cert_chain:
                cert_chain = X509CertChain()
                cert_chain.parsePemList(s)
            else:
                v_host_cert = X509CertChain()
                v_host_cert.parsePemList(s)
        elif opt == "-u":
            username = arg
        elif opt == "-p":
            password = arg
        elif opt == "-t":
            if tackpyLoaded:
                s = open(arg, "rU").read()
                tacks = Tack.createFromPemList(s)
        elif opt == "-v":
            verifierDB = VerifierDB(arg)
            verifierDB.open()
        elif opt == "-d":
            directory = arg
        elif opt == "--reqcert":
            reqCert = True
        elif opt == "-l":
            expLabel = arg
        elif opt == "-L":
            expLength = int(arg)
        elif opt == "-a":
            alpn.append(bytearray(arg, 'utf-8'))
        elif opt == "--param":
            s = open(arg, "rb").read()
            if sys.version_info[0] >= 3:
                s = str(s, 'utf-8')
            dhparam = parseDH(s)
        elif opt == "--psk":
            psk = a2b_hex(arg)
        elif opt == "--psk-ident":
            psk_ident = bytearray(arg, 'utf-8')
        elif opt == "--psk-sha384":
            psk_hash = 'sha384'
        elif opt == "--resumption":
            resumption = True
        elif opt == "--ssl3":
            ssl3 = True
        elif opt == "--max-ver":
            max_ver = ver_to_tuple(arg)
        elif opt == "--tickets":
            tickets = int(arg)
        elif opt == "--cipherlist":
            ciphers.append(arg)
        elif opt == "--request-pha":
            request_pha = True
        elif opt == "--require-pha":
            require_pha = True
        else:
            assert(False)

    # when no names provided, don't return array
    if not alpn:
        alpn = None
    if (psk and not psk_ident) or (not psk and psk_ident):
        printError("PSK and IDENTITY must be set together")
    if not argv:
        printError("Missing address")
    if len(argv)>1:
        printError("Too many arguments")
    #Split address into hostname/port tuple
    address = argv[0]
    address = address.split(":")
    if len(address) != 2:
        raise SyntaxError("Must specify <host>:<port>")
    address = ( address[0], int(address[1]) )

    # Populate the return list
    retList = [address]
    if "k" in argString:
        retList.append(privateKey)
    if "c" in argString:
        retList.append(cert_chain)
        retList.append(virtual_hosts)
    if "u" in argString:
        retList.append(username)
    if "p" in argString:
        retList.append(password)
    if "t" in argString:
        retList.append(tacks)
    if "v" in argString:
        retList.append(verifierDB)
    if "d" in argString:
        retList.append(directory)
    if "reqcert" in flagsList:
        retList.append(reqCert)
    if "l" in argString:
        retList.append(expLabel)
    if "L" in argString:
        retList.append(expLength)
    if "a" in argString:
        retList.append(alpn)
    if "param=" in flagsList:
        retList.append(dhparam)
    if "psk=" in flagsList:
        retList.append(psk)
    if "psk-ident=" in flagsList:
        retList.append(psk_ident)
    if "psk-sha384" in flagsList:
        retList.append(psk_hash)
    if "resumption" in flagsList:
        retList.append(resumption)
    if "ssl3" in flagsList:
        retList.append(ssl3)
    if "max-ver=" in flagsList:
        retList.append(max_ver)
    if "tickets=" in flagsList:
        retList.append(tickets)
    if "cipherlist=" in flagsList:
        retList.append(ciphers)
    if "request-pha" in flagsList:
        retList.append(request_pha)
    if "require-pha" in flagsList:
        retList.append(require_pha)
    return retList


def printGoodConnection(connection, seconds):
    print("  Handshake time: %.3f seconds" % seconds)
    print("  Version: %s" % connection.getVersionName())
    print("  Cipher: %s %s" % (connection.getCipherName(), 
        connection.getCipherImplementation()))
    print("  Ciphersuite: {0}".\
            format(CipherSuite.ietfNames[connection.session.cipherSuite]))
    if connection.session.srpUsername:
        print("  Client SRP username: %s" % connection.session.srpUsername)
    if connection.session.clientCertChain:
        print("  Client X.509 SHA1 fingerprint: %s" % 
            connection.session.clientCertChain.getFingerprint())
    else:
        print("  No client certificate provided by peer")
    if connection.session.serverCertChain:
        print("  Server X.509 SHA1 fingerprint: %s" % 
            connection.session.serverCertChain.getFingerprint())
    if connection.version >= (3, 3) and connection.serverSigAlg is not None:
        scheme = SignatureScheme.toRepr(connection.serverSigAlg)
        if scheme is None:
            scheme = "{1}+{0}".format(
                HashAlgorithm.toStr(connection.serverSigAlg[0]),
                SignatureAlgorithm.toStr(connection.serverSigAlg[1]))
        print("  Key exchange signature: {0}".format(scheme))
    if connection.ecdhCurve is not None:
        print("  Group used for key exchange: {0}".format(\
                GroupName.toStr(connection.ecdhCurve)))
    if connection.dhGroupSize is not None:
        print("  DH group size: {0} bits".format(connection.dhGroupSize))
    if connection.session.serverName:
        print("  SNI: %s" % connection.session.serverName)
    if connection.session.tackExt:   
        if connection.session.tackInHelloExt:
            emptyStr = "\n  (via TLS Extension)"
        else:
            emptyStr = "\n  (via TACK Certificate)" 
        print("  TACK: %s" % emptyStr)
        print(str(connection.session.tackExt))
    if connection.session.appProto:
        print("  Application Layer Protocol negotiated: {0}".format(
            connection.session.appProto.decode('utf-8')))
    print("  Next-Protocol Negotiated: %s" % connection.next_proto) 
    print("  Encrypt-then-MAC: {0}".format(connection.encryptThenMAC))
    print("  Extended Master Secret: {0}".format(
                                               connection.extendedMasterSecret))

def printExporter(connection, expLabel, expLength):
    if expLabel is None:
        return
    expLabel = bytearray(expLabel, "utf-8")
    exp = connection.keyingMaterialExporter(expLabel, expLength)
    exp = b2a_hex(exp).upper()
    print("  Exporter label: {0}".format(expLabel))
    print("  Exporter length: {0}".format(expLength))
    print("  Keying material: {0}".format(exp))

    
def clientCmd(argv):
    (address, privateKey, cert_chain, virtual_hosts, username, password,
            expLabel,
            expLength, alpn, psk, psk_ident, psk_hash, resumption, ssl3,
            max_ver, cipherlist) = \
        handleArgs(argv, "kcuplLa", ["psk=", "psk-ident=", "psk-sha384",
                                     "resumption", "ssl3", "max-ver=",
                                     "cipherlist="])
        
    if (cert_chain and not privateKey) or (not cert_chain and privateKey):
        raise SyntaxError("Must specify CERT and KEY together")
    if (username and not password) or (not username and password):
        raise SyntaxError("Must specify USER with PASS")
    if cert_chain and username:
        raise SyntaxError("Can use SRP or client cert for auth, not both")
    if expLabel is not None and not expLabel:
        raise ValueError("Label must be non-empty")

    #Connect to server
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(5)
    sock.connect(address)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    connection = TLSConnection(sock)
    
    settings = HandshakeSettings()
    if psk:
        settings.pskConfigs = [(psk_ident, psk, psk_hash)]
    settings.useExperimentalTackExtension = True
    if ssl3:
        settings.minVersion = (3, 0)
    if max_ver:
        settings.maxVersion = max_ver
    if cipherlist:
        settings.cipherNames = [item for cipher in cipherlist
                                for item in cipher.split(',')]
    try:
        start = time_stamp()
        if username and password:
            connection.handshakeClientSRP(username, password, 
                settings=settings, serverName=address[0])
        else:
            connection.handshakeClientCert(cert_chain, privateKey,
                settings=settings, serverName=address[0], alpn=alpn)
        stop = time_stamp()
        print("Handshake success")        
    except TLSLocalAlert as a:
        if a.description == AlertDescription.user_canceled:
            print(str(a))
        else:
            raise
        sys.exit(-1)
    except TLSRemoteAlert as a:
        if a.description == AlertDescription.unknown_psk_identity:
            if username:
                print("Unknown username")
            else:
                raise
        elif a.description == AlertDescription.bad_record_mac:
            if username:
                print("Bad username or password")
            else:
                raise
        elif a.description == AlertDescription.handshake_failure:
            print("Unable to negotiate mutually acceptable parameters")
        else:
            raise
        sys.exit(-1)
    printGoodConnection(connection, stop-start)
    printExporter(connection, expLabel, expLength)
    session = connection.session
    connection.send(b"GET / HTTP/1.0\r\n\r\n")
    while True:
        try:
            r = connection.recv(10240)
            if not r:
                break
        except socket.timeout:
            break
        except TLSAbruptCloseError:
            break
    connection.close()
    # we're expecting an abrupt close error which marks the session as
    # unreasumable, override it
    session.resumable = True

    print("Received {0} ticket[s]".format(len(connection.tickets)))
    assert connection.tickets is session.tickets

    if not session.tickets:
        return

    if not resumption:
        return

    print("Trying resumption handshake")

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(5)
    sock.connect(address)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    connection = TLSConnection(sock)

    try:
        start = time_stamp()
        connection.handshakeClientCert(serverName=address[0], alpn=alpn,
            session=session)
        stop = time_stamp()
        print("Handshake success")
    except TLSLocalAlert as a:
        if a.description == AlertDescription.user_canceled:
            print(str(a))
        else:
            raise
        sys.exit(-1)
    except TLSRemoteAlert as a:
        if a.description == AlertDescription.unknown_psk_identity:
            if username:
                print("Unknown username")
            else:
                raise
        elif a.description == AlertDescription.bad_record_mac:
            if username:
                print("Bad username or password")
            else:
                raise
        elif a.description == AlertDescription.handshake_failure:
            print("Unable to negotiate mutually acceptable parameters")
        else:
            raise
        sys.exit(-1)
    printGoodConnection(connection, stop-start)
    printExporter(connection, expLabel, expLength)
    connection.close()


def serverCmd(argv):
    (address, privateKey, cert_chain, virtual_hosts, tacks, verifierDB,
            directory, reqCert,
            expLabel, expLength, dhparam, psk, psk_ident, psk_hash, ssl3,
            max_ver, tickets, cipherlist, request_pha, require_pha) = \
        handleArgs(argv, "kctbvdlL",
                   ["reqcert", "param=", "psk=",
                    "psk-ident=", "psk-sha384", "ssl3", "max-ver=",
                    "tickets=", "cipherlist=", "request-pha", "require-pha"])


    if (cert_chain and not privateKey) or (not cert_chain and privateKey):
        raise SyntaxError("Must specify CERT and KEY together")
    if tacks and not cert_chain:
        raise SyntaxError("Must specify CERT with Tacks")
    
    print("I am an HTTPS test server, I will listen on %s:%d" % 
            (address[0], address[1]))    
    if directory:
        os.chdir(directory)
    print("Serving files from %s" % os.getcwd())
    
    if cert_chain and privateKey:
        print("Using certificate and private key...")
    if verifierDB:
        print("Using verifier DB...")
    if tacks:
        print("Using Tacks...")
    if reqCert:
        print("Asking for client certificates...")
        
    #############
    sessionCache = SessionCache()
    username = None
    sni = None
    if is_valid_hostname(address[0]):
        sni = address[0]
    settings = HandshakeSettings()
    settings.useExperimentalTackExtension=True
    settings.dhParams = dhparam
    if tickets is not None:
        settings.ticket_count = tickets
    if psk:
        settings.pskConfigs = [(psk_ident, psk, psk_hash)]
    settings.ticketKeys = [getRandomBytes(32)]
    if ssl3:
        settings.minVersion = (3, 0)
    if max_ver:
        settings.maxVersion = max_ver
    settings.virtual_hosts = virtual_hosts
    if cipherlist:
        settings.cipherNames = [item for cipher in cipherlist
                                for item in cipher.split(',')]

    class MySimpleHTTPHandler(SimpleHTTPRequestHandler, object):
        """Buffer the header and body of HTTP message."""
        wbufsize = -1

        def do_GET(self):
            """Simple override to send KeyUpdate to client."""
            if self.path.startswith('/keyupdate'):
                for i in self.connection.send_keyupdate_request(
                        KeyUpdateMessageType.update_requested):
                    if i in (0, 1):
                        continue
                    else:
                        raise ValueError("Invalid return from "
                                         "send_keyupdate_request")
            if self.path.startswith('/secret') and not request_pha:
                try:
                    for i in self.connection.request_post_handshake_auth():
                        pass
                except ValueError:
                    self.wfile.write(b'HTTP/1.0 401 Certificate authentication'
                                     b' required\r\n')
                    self.wfile.write(b'Connection: close\r\n')
                    self.wfile.write(b'Content-Length: 0\r\n\r\n')
                    return
                self.connection.read(0, 0)
                if self.connection.session.clientCertChain:
                    print("   Got client certificate in post-handshake auth: "
                          "{0}".format(self.connection.session
                                       .clientCertChain.getFingerprint()))
                else:
                    print("   No certificate from client received")
                    self.wfile.write(b'HTTP/1.0 401 Certificate authentication'
                                     b' required\r\n')
                    self.wfile.write(b'Connection: close\r\n')
                    self.wfile.write(b'Content-Length: 0\r\n\r\n')
                    return
            return super(MySimpleHTTPHandler, self).do_GET()

    class MyHTTPServer(ThreadingMixIn, TLSSocketServerMixIn, HTTPServer):
        def handshake(self, connection):
            print("About to handshake...")
            activationFlags = 0
            if tacks:
                if len(tacks) == 1:
                    activationFlags = 1
                elif len(tacks) == 2:
                    activationFlags = 3

            try:
                start = time_stamp()
                connection.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY,
                                      1)
                connection.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
                                      struct.pack('ii', 1, 5))
                connection.client_cert_required = require_pha
                connection.handshakeServer(certChain=cert_chain,
                                              privateKey=privateKey,
                                              verifierDB=verifierDB,
                                              tacks=tacks,
                                              activationFlags=activationFlags,
                                              sessionCache=sessionCache,
                                              settings=settings,
                                              nextProtos=[b"http/1.1"],
                                              alpn=[bytearray(b'http/1.1')],
                                              reqCert=reqCert,
                                              sni=sni)
                                              # As an example (does not work here):
                                              #nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"])
                try:
                    if request_pha:
                        for i in connection.request_post_handshake_auth():
                            pass
                except ValueError:
                    # if we can't do PHA, we can't do it
                    pass
                stop = time_stamp()
            except TLSRemoteAlert as a:
                if a.description == AlertDescription.user_canceled:
                    print(str(a))
                    return False
                else:
                    raise
            except TLSLocalAlert as a:
                if a.description == AlertDescription.unknown_psk_identity:
                    if username:
                        print("Unknown username")
                        return False
                    else:
                        raise
                elif a.description == AlertDescription.bad_record_mac:
                    if username:
                        print("Bad username or password")
                        return False
                    else:
                        raise
                elif a.description == AlertDescription.handshake_failure:
                    print("Unable to negotiate mutually acceptable parameters")
                    return False
                else:
                    raise

            connection.ignoreAbruptClose = True
            printGoodConnection(connection, stop-start)
            printExporter(connection, expLabel, expLength)
            return True

    httpd = MyHTTPServer(address, MySimpleHTTPHandler)
    httpd.serve_forever()


if __name__ == '__main__':
    if len(sys.argv) < 2:
        printUsage("Missing command")
    elif sys.argv[1] == "client"[:len(sys.argv[1])]:
        clientCmd(sys.argv[2:])
    elif sys.argv[1] == "server"[:len(sys.argv[1])]:
        serverCmd(sys.argv[2:])
    else:
        printUsage("Unknown command: %s" % sys.argv[1])

The diff you're trying to view is too large. Only the first 1000 changed files have been loaded.
Showing with 0 additions and 0 deletions (0 / 0 diffs computed)
swh spinner

Computing file changes ...

ENEA — Copyright (C), ENEA. License: GNU AGPLv3+.
Legal notes  ::  JavaScript license information ::  Web API

back to top