Skip to main content
  • Home
  • login
  • Browse the archive

    swh mirror partner logo
swh logo
SoftwareHeritage
Software
Heritage
Mirror
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

  • a80c21a
  • /
  • src
  • /
  • rust
  • /
  • src
  • /
  • asn1.rs
Raw File
Permalinks

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
content badge Iframe embedding
swh:1:cnt:522e21ac622222980698076cbf69019350d6e9d1
directory badge Iframe embedding
swh:1:dir:a911de597da6a3641f36085fa4cd396e548572c2
asn1.rs
// This file is dual licensed under the terms of the Apache License, Version
// 2.0, and the BSD License. See the LICENSE file in the root of this repository
// for complete details.

use crate::x509::Name;
use pyo3::basic::CompareOp;
use pyo3::types::IntoPyDict;
use pyo3::ToPyObject;

pub enum PyAsn1Error {
    Asn1Parse(asn1::ParseError),
    Asn1Write(asn1::WriteError),
    Py(pyo3::PyErr),
}

impl From<asn1::ParseError> for PyAsn1Error {
    fn from(e: asn1::ParseError) -> PyAsn1Error {
        PyAsn1Error::Asn1Parse(e)
    }
}

impl From<asn1::WriteError> for PyAsn1Error {
    fn from(e: asn1::WriteError) -> PyAsn1Error {
        PyAsn1Error::Asn1Write(e)
    }
}

impl From<pyo3::PyErr> for PyAsn1Error {
    fn from(e: pyo3::PyErr) -> PyAsn1Error {
        PyAsn1Error::Py(e)
    }
}

impl From<pyo3::PyDowncastError<'_>> for PyAsn1Error {
    fn from(e: pyo3::PyDowncastError<'_>) -> PyAsn1Error {
        PyAsn1Error::Py(e.into())
    }
}

impl From<pem::PemError> for PyAsn1Error {
    fn from(e: pem::PemError) -> PyAsn1Error {
        PyAsn1Error::Py(pyo3::exceptions::PyValueError::new_err(format!(
            "Unable to load PEM file. See https://cryptography.io/en/latest/faq/#why-can-t-i-import-my-pem-file for more details. {:?}",
            e
        )))
    }
}

impl From<PyAsn1Error> for pyo3::PyErr {
    fn from(e: PyAsn1Error) -> pyo3::PyErr {
        match e {
            PyAsn1Error::Asn1Parse(asn1_error) => pyo3::exceptions::PyValueError::new_err(format!(
                "error parsing asn1 value: {:?}",
                asn1_error
            )),
            PyAsn1Error::Asn1Write(asn1::WriteError::AllocationError) => {
                pyo3::exceptions::PyMemoryError::new_err(
                    "failed to allocate memory while performing ASN.1 serialization",
                )
            }
            PyAsn1Error::Py(py_error) => py_error,
        }
    }
}

impl PyAsn1Error {
    pub(crate) fn add_location(self, loc: asn1::ParseLocation) -> Self {
        match self {
            PyAsn1Error::Py(e) => PyAsn1Error::Py(e),
            PyAsn1Error::Asn1Parse(e) => PyAsn1Error::Asn1Parse(e.add_location(loc)),
            PyAsn1Error::Asn1Write(e) => PyAsn1Error::Asn1Write(e),
        }
    }
}

// The primary purpose of this alias is for brevity to keep function signatures
// to a single-line as a work around for coverage issues. See
// https://github.com/pyca/cryptography/pull/6173
pub(crate) type PyAsn1Result<T = pyo3::PyObject> = Result<T, PyAsn1Error>;

pub(crate) fn py_oid_to_oid(py_oid: &pyo3::PyAny) -> pyo3::PyResult<asn1::ObjectIdentifier> {
    Ok(py_oid
        .downcast::<pyo3::PyCell<crate::oid::ObjectIdentifier>>()?
        .borrow()
        .oid
        .clone())
}

pub(crate) fn oid_to_py_oid<'p>(
    py: pyo3::Python<'p>,
    oid: &asn1::ObjectIdentifier,
) -> pyo3::PyResult<&'p pyo3::PyAny> {
    Ok(pyo3::Py::new(py, crate::oid::ObjectIdentifier { oid: oid.clone() })?.into_ref(py))
}

#[derive(asn1::Asn1Read)]
struct AlgorithmIdentifier<'a> {
    _oid: asn1::ObjectIdentifier,
    _params: Option<asn1::Tlv<'a>>,
}

#[derive(asn1::Asn1Read)]
struct Spki<'a> {
    _algorithm: AlgorithmIdentifier<'a>,
    data: asn1::BitString<'a>,
}

#[pyo3::prelude::pyfunction]
fn parse_spki_for_data(py: pyo3::Python<'_>, data: &[u8]) -> Result<pyo3::PyObject, PyAsn1Error> {
    let spki = asn1::parse_single::<Spki<'_>>(data)?;
    if spki.data.padding_bits() != 0 {
        return Err(pyo3::exceptions::PyValueError::new_err("Invalid public key encoding").into());
    }

    Ok(pyo3::types::PyBytes::new(py, spki.data.as_bytes()).to_object(py))
}

#[derive(asn1::Asn1Read, asn1::Asn1Write)]
struct DssSignature<'a> {
    r: asn1::BigUint<'a>,
    s: asn1::BigUint<'a>,
}

pub(crate) fn big_byte_slice_to_py_int<'p>(
    py: pyo3::Python<'p>,
    v: &'_ [u8],
) -> pyo3::PyResult<&'p pyo3::PyAny> {
    let int_type = py.get_type::<pyo3::types::PyLong>();
    let kwargs = [("signed", true)].into_py_dict(py);
    int_type.call_method("from_bytes", (v, "big"), Some(kwargs))
}

#[pyo3::prelude::pyfunction]
fn decode_dss_signature(py: pyo3::Python<'_>, data: &[u8]) -> Result<pyo3::PyObject, PyAsn1Error> {
    let sig = asn1::parse_single::<DssSignature<'_>>(data)?;

    Ok((
        big_byte_slice_to_py_int(py, sig.r.as_bytes())?,
        big_byte_slice_to_py_int(py, sig.s.as_bytes())?,
    )
        .to_object(py))
}

pub(crate) fn py_uint_to_big_endian_bytes<'p>(
    py: pyo3::Python<'p>,
    v: &'p pyo3::types::PyLong,
) -> pyo3::PyResult<&'p [u8]> {
    let zero = (0).to_object(py);
    if v.rich_compare(zero, CompareOp::Lt)?.is_true()? {
        return Err(pyo3::exceptions::PyValueError::new_err(
            "Negative integers are not supported",
        ));
    }

    // Round the length up so that we prefix an extra \x00. This ensures that
    // integers that'd have the high bit set in their first octet are not
    // encoded as negative in DER.
    let n = v.call_method0("bit_length")?.extract::<usize>()? / 8 + 1;
    v.call_method1("to_bytes", (n, "big"))?.extract()
}

pub(crate) fn encode_der_data<'p>(
    py: pyo3::Python<'p>,
    pem_tag: String,
    data: Vec<u8>,
    encoding: &'p pyo3::PyAny,
) -> PyAsn1Result<&'p pyo3::types::PyBytes> {
    let encoding_class = py
        .import("cryptography.hazmat.primitives.serialization")?
        .getattr(crate::intern!(py, "Encoding"))?;

    if encoding == encoding_class.getattr(crate::intern!(py, "DER"))? {
        Ok(pyo3::types::PyBytes::new(py, &data))
    } else if encoding == encoding_class.getattr(crate::intern!(py, "PEM"))? {
        Ok(pyo3::types::PyBytes::new(
            py,
            &pem::encode_config(
                &pem::Pem {
                    tag: pem_tag,
                    contents: data,
                },
                pem::EncodeConfig {
                    line_ending: pem::LineEnding::LF,
                },
            )
            .into_bytes(),
        ))
    } else {
        Err(
            pyo3::exceptions::PyTypeError::new_err("encoding must be Encoding.DER or Encoding.PEM")
                .into(),
        )
    }
}

#[pyo3::prelude::pyfunction]
fn encode_dss_signature(
    py: pyo3::Python<'_>,
    r: &pyo3::types::PyLong,
    s: &pyo3::types::PyLong,
) -> PyAsn1Result<pyo3::PyObject> {
    let sig = DssSignature {
        r: asn1::BigUint::new(py_uint_to_big_endian_bytes(py, r)?).unwrap(),
        s: asn1::BigUint::new(py_uint_to_big_endian_bytes(py, s)?).unwrap(),
    };
    let result = asn1::write_single(&sig)?;
    Ok(pyo3::types::PyBytes::new(py, &result).to_object(py))
}

#[pyo3::prelude::pyclass]
struct TestCertificate {
    #[pyo3(get)]
    not_before_tag: u8,
    #[pyo3(get)]
    not_after_tag: u8,
    #[pyo3(get)]
    issuer_value_tags: Vec<u8>,
    #[pyo3(get)]
    subject_value_tags: Vec<u8>,
}

#[derive(asn1::Asn1Read)]
struct Asn1Certificate<'a> {
    tbs_cert: TbsCertificate<'a>,
    _signature_alg: asn1::Sequence<'a>,
    _signature: asn1::BitString<'a>,
}

#[derive(asn1::Asn1Read)]
struct TbsCertificate<'a> {
    #[explicit(0)]
    _version: Option<u8>,
    _serial: asn1::BigUint<'a>,
    _signature_alg: asn1::Sequence<'a>,

    issuer: Name<'a>,
    validity: Validity<'a>,
    subject: Name<'a>,

    _spki: asn1::Sequence<'a>,
    #[implicit(1)]
    _issuer_unique_id: Option<asn1::BitString<'a>>,
    #[implicit(2)]
    _subject_unique_id: Option<asn1::BitString<'a>>,
    #[explicit(3)]
    _extensions: Option<asn1::Sequence<'a>>,
}

#[derive(asn1::Asn1Read)]
struct Validity<'a> {
    not_before: asn1::Tlv<'a>,
    not_after: asn1::Tlv<'a>,
}

fn parse_name_value_tags(rdns: &mut Name<'_>) -> Vec<u8> {
    let mut tags = vec![];
    for rdn in rdns.unwrap_read().clone() {
        let mut attributes = rdn.collect::<Vec<_>>();
        assert_eq!(attributes.len(), 1);

        tags.push(attributes.pop().unwrap().value.tag().as_u8().unwrap());
    }
    tags
}

#[pyo3::prelude::pyfunction]
fn test_parse_certificate(data: &[u8]) -> Result<TestCertificate, PyAsn1Error> {
    let mut asn1_cert = asn1::parse_single::<Asn1Certificate<'_>>(data)?;

    Ok(TestCertificate {
        not_before_tag: asn1_cert
            .tbs_cert
            .validity
            .not_before
            .tag()
            .as_u8()
            .unwrap(),
        not_after_tag: asn1_cert.tbs_cert.validity.not_after.tag().as_u8().unwrap(),
        issuer_value_tags: parse_name_value_tags(&mut asn1_cert.tbs_cert.issuer),
        subject_value_tags: parse_name_value_tags(&mut asn1_cert.tbs_cert.subject),
    })
}

pub(crate) fn create_submodule(py: pyo3::Python<'_>) -> pyo3::PyResult<&pyo3::prelude::PyModule> {
    let submod = pyo3::prelude::PyModule::new(py, "asn1")?;
    submod.add_wrapped(pyo3::wrap_pyfunction!(parse_spki_for_data))?;

    submod.add_wrapped(pyo3::wrap_pyfunction!(decode_dss_signature))?;
    submod.add_wrapped(pyo3::wrap_pyfunction!(encode_dss_signature))?;

    submod.add_wrapped(pyo3::wrap_pyfunction!(test_parse_certificate))?;

    Ok(submod)
}

#[cfg(test)]
mod tests {
    use super::PyAsn1Error;

    #[test]
    fn test_pyasn1error_from() {
        pyo3::prepare_freethreaded_python();
        pyo3::Python::with_gil(|py| {
            let e: PyAsn1Error = asn1::WriteError::AllocationError.into();
            assert!(matches!(
                e,
                PyAsn1Error::Asn1Write(asn1::WriteError::AllocationError)
            ));
            let py_e: pyo3::PyErr = e.into();
            assert!(py_e.is_instance::<pyo3::exceptions::PyMemoryError>(py));

            let e: PyAsn1Error = pyo3::PyDowncastError::new(py.None().as_ref(py), "abc").into();
            assert!(matches!(e, PyAsn1Error::Py(_)));
        })
    }

    #[test]
    fn test_pyasn1error_add_location() {
        let py_err = pyo3::PyErr::new::<pyo3::exceptions::PyValueError, _>("Error!");
        PyAsn1Error::Py(py_err).add_location(asn1::ParseLocation::Field("meh"));

        let asn1_write_err = asn1::WriteError::AllocationError;
        PyAsn1Error::Asn1Write(asn1_write_err).add_location(asn1::ParseLocation::Field("meh"));
    }
}

ENEA — Copyright (C), ENEA. License: GNU AGPLv3+.
Legal notes  ::  JavaScript license information ::  Web API

back to top