From: Rainer Jung Date: Tue, 16 Jun 2026 19:17:22 +0000 (+0000) Subject: pytest mod_md: fix python crypto deprecations. X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=4cf67ce32deda0de12ccdb9a77f7221e90543acf;p=thirdparty%2Fapache%2Fhttpd.git pytest mod_md: fix python crypto deprecations. Use the modern crypto API instead of OpenSSL things. Merge from icing/md: https://github.com/icing/mod_md/commit/41496f9a7333c4e0c46b54ab08f04be8433caafd git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1935439 13f79535-47bb-0310-9956-ffa450edef68 --- diff --git a/test/modules/md/md_cert_util.py b/test/modules/md/md_cert_util.py index 6cd034a02b..796af220df 100755 --- a/test/modules/md/md_cert_util.py +++ b/test/modules/md/md_cert_util.py @@ -11,7 +11,12 @@ from datetime import timedelta from http.client import HTTPConnection from urllib.parse import urlparse +from cryptography.hazmat._oid import ExtensionOID +from cryptography.hazmat.bindings._rust import ObjectIdentifier +from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, load_pem_private_key + from cryptography import x509 +from cryptography.x509 import DNSName, ExtensionNotFound SEC_PER_DAY = 24 * 60 * 60 @@ -21,7 +26,6 @@ log = logging.getLogger(__name__) class MDCertUtil(object): # Utility class for inspecting certificates in test cases - # Uses PyOpenSSL: https://pyopenssl.org/en/stable/index.html @classmethod def load_server_cert(cls, host_ip, host_port, host_name, tls=None, ciphers=None): @@ -42,12 +46,12 @@ class MDCertUtil(object): connection.setblocking(1) connection.set_tlsext_host_name(host_name.encode('utf-8')) connection.do_handshake() - peer_cert = connection.get_peer_certificate() - return MDCertUtil(None, cert=peer_cert) + ossl_cert = connection.get_peer_certificate() + return MDCertUtil(None, cert=ossl_cert.to_cryptography()) @classmethod def parse_pem_cert(cls, text): - cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, text.encode('utf-8')) + cert = x509.load_pem_x509_certificate(text.encode('utf-8')) return MDCertUtil(None, cert=cert) @classmethod @@ -72,24 +76,26 @@ class MDCertUtil(object): return None def __init__(self, cert_path, cert=None): + self.cert = cert + self.privkey = None if cert_path is not None: self.cert_path = cert_path # load certificate and private key if cert_path.startswith("http"): - cert_data = self.get_plain(cert_path, 1) - else: - cert_data = MDCertUtil._load_binary_file(cert_path) - - for file_type in (OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1): - try: - self.cert = OpenSSL.crypto.load_certificate(file_type, cert_data) - except Exception as error: - self.error = error - if cert is not None: - self.cert = cert - - if self.cert is None: - raise self.error + assert False + try: + with open(cert_path) as fd: + cert = x509.load_pem_x509_certificate("".join(fd.readlines()).encode()) + except Exception as error: + self.error = error + if cert is not None: + self.cert = cert + if self.cert is None: + raise self.error + + def add_privkey(self, path, password=None): + with open(path) as fd: + self.privkey = load_pem_private_key("".join(fd.readlines()).encode(), password=password) def get_issuer(self): return self.cert.get_issuer() @@ -97,21 +103,20 @@ class MDCertUtil(object): def get_serial(self): # the string representation of a serial number is not unique. Some # add leading 0s to align with word boundaries. - return ("%lx" % (self.cert.get_serial_number())).upper() + return ("%lx" % (self.cert.serial_number)).upper() @staticmethod def _get_serial(cert) -> int: if isinstance(cert, x509.Certificate): return cert.serial_number if isinstance(cert, MDCertUtil): - return cert.get_serial_number() - elif isinstance(cert, OpenSSL.crypto.X509): - return cert.get_serial_number() + return cert.cert.serial_number elif isinstance(cert, str): # assume a hex number return int(cert, 16) elif isinstance(cert, int): return cert + assert False, f'{cert}' return 0 def get_serial_number(self): @@ -121,89 +126,33 @@ class MDCertUtil(object): return self._get_serial(self.cert) == self._get_serial(other) def get_not_before(self): - tsp = self.cert.get_notBefore() - return self._parse_tsp(tsp) + try: + return self.cert.not_valid_before_utc + except AttributeError: + return self.cert.not_valid_before def get_not_after(self): - tsp = self.cert.get_notAfter() - return self._parse_tsp(tsp) - - def get_cn(self): - return self.cert.get_subject().CN + try: + return self.cert.not_valid_after_utc + except AttributeError: + return self.cert.not_valid_after def get_key_length(self): - return self.cert.get_pubkey().bits() + return self.cert.public_key().key_size def get_san_list(self): - text = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_TEXT, self.cert).decode("utf-8") - m = re.search(r"X509v3 Subject Alternative Name:(\s+critical)?\s*(.*)", text) - sans_list = [] - if m: - sans_list = m.group(2).split(",") - - def _strip_prefix(s): - return s.split(":")[1] if s.strip().startswith("DNS:") else s.strip() - return list(map(_strip_prefix, sans_list)) + sans = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) + return sans.value.get_values_for_type(DNSName) def get_must_staple(self): - text = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_TEXT, self.cert).decode("utf-8") - m = re.search(r"1.3.6.1.5.5.7.1.24:\s*\n\s*0....", text) - if not m: - # Newer openssl versions print this differently - m = re.search(r"TLS Feature:\s*\n\s*status_request\s*\n", text) - return m is not None + try: + self.cert.extensions.get_extension_for_oid(ExtensionOID.TLS_FEATURE) + return True + except ExtensionNotFound: + return False @classmethod def validate_privkey(cls, privkey_path, passphrase=None): - privkey_data = cls._load_binary_file(privkey_path) - if passphrase: - privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey_data, passphrase) - else: - privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey_data) - return privkey.check() - - def validate_cert_matches_priv_key(self, privkey_path): - # Verifies that the private key and cert match. - privkey_data = MDCertUtil._load_binary_file(privkey_path) - privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey_data) - context = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD) - context.use_privatekey(privkey) - context.use_certificate(self.cert) - context.check_privatekey() - - # --------- _utils_ --------- - - def astr(self, s): - return s.decode('utf-8') - - def _parse_tsp(self, tsp): - # timestampss returned by PyOpenSSL are bytes - # parse date and time part - s = ("%s-%s-%s %s:%s:%s" % (self.astr(tsp[0:4]), self.astr(tsp[4:6]), self.astr(tsp[6:8]), - self.astr(tsp[8:10]), self.astr(tsp[10:12]), self.astr(tsp[12:14]))) - timestamp = datetime.strptime(s, '%Y-%m-%d %H:%M:%S') - # adjust timezone - tz_h, tz_m = 0, 0 - m = re.match(r"([+\-]\d{2})(\d{2})", self.astr(tsp[14:])) - if m: - tz_h, tz_m = int(m.group(1)), int(m.group(2)) if tz_h > 0 else -1 * int(m.group(2)) - return timestamp.replace(tzinfo=self.FixedOffset(60 * tz_h + tz_m)) - - @classmethod - def _load_binary_file(cls, path): - with open(path, mode="rb") as file: - return file.read() - - class FixedOffset(tzinfo): - - def __init__(self, offset): - self.__offset = timedelta(minutes=offset) - - def utcoffset(self, dt): - return self.__offset - - def tzname(self, dt): - return None - - def dst(self, dt): - return timedelta(0) + with open(privkey_path) as fd: + privkey = load_pem_private_key("".join(fd.readlines()).encode(), password=passphrase) + return privkey is not None diff --git a/test/modules/md/md_env.py b/test/modules/md/md_env.py index acc8417b14..7b5f0f9e56 100755 --- a/test/modules/md/md_env.py +++ b/test/modules/md/md_env.py @@ -340,7 +340,7 @@ class MDTestEnv(HttpdTestEnv): md = self.get_md_status(domain) assert md assert 'state' in md, "md is unexpected: {0}".format(md) - assert md['state'] is MDTestEnv.MD_S_COMPLETE, f"unexpected state: {md['state']}" + assert md['state'] is MDTestEnv.MD_S_COMPLETE, f"unexpected state: {md}" pkey_file = self.store_domain_file(domain, self.pkey_fname(pkey)) cert_file = self.store_domain_file(domain, self.cert_fname(pkey)) r = self.run(['ls', os.path.dirname(pkey_file)]) @@ -359,7 +359,7 @@ class MDTestEnv(HttpdTestEnv): # check private key, validate certificate, etc MDCertUtil.validate_privkey(self.store_domain_file(domain, 'privkey.pem')) cert = MDCertUtil(self.store_domain_file(domain, 'pubcert.pem')) - cert.validate_cert_matches_priv_key(self.store_domain_file(domain, 'privkey.pem')) + cert.add_privkey(self.store_domain_file(domain, 'privkey.pem')) # No longer check CN, it may not be set or is not trusted anyway # assert cert.get_cn() == domain, f'CN: expected "{domain}", got {cert.get_cn()}' # check SANs diff --git a/test/modules/md/test_502_acmev2_drive.py b/test/modules/md/test_502_acmev2_drive.py index 484e4d4bd9..c3a6854b2b 100644 --- a/test/modules/md/test_502_acmev2_drive.py +++ b/test/modules/md/test_502_acmev2_drive.py @@ -395,7 +395,7 @@ class TestDrivev2: # check new cert env.check_md_credentials([name, "test." + domain]) new_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem')) - assert not old_cert.same_serial_as(new_cert.get_serial) + assert not old_cert.same_serial_as(new_cert.get_serial()) @pytest.mark.parametrize("renew_window,test_data_list", [ ("14d", [ @@ -550,4 +550,4 @@ class TestDrivev2: # check: key file is encrypted PEM md = env.a2md(["list", name]).json['output'][0] acc = md['ca']['account'] - MDCertUtil.validate_privkey(env.path_account_key(acc), lambda *args: encrypt_key) + MDCertUtil.validate_privkey(env.path_account_key(acc), encrypt_key) diff --git a/test/modules/md/test_702_auto.py b/test/modules/md/test_702_auto.py index 1536d39624..b4d7a0b051 100644 --- a/test/modules/md/test_702_auto.py +++ b/test/modules/md/test_702_auto.py @@ -208,8 +208,8 @@ class TestAutov2: # check temporary cert from server cert2 = MDCertUtil(env.path_fallback_cert(domain)) assert cert1.same_serial_as(cert2), \ - "Unexpected temporary certificate on vhost %s. Expected cn: %s , "\ - "but found cn: %s" % (name_a, cert2.get_cn(), cert1.get_cn()) + f"Unexpected temporary certificate on vhost {name_a}." \ + f" Expected cn: {cert2}, but found cn: {cert1}" # test case: drive MD with only invalid challenges, domains should stay 503'd def test_md_702_006(self, env): diff --git a/test/modules/md/test_800_must_staple.py b/test/modules/md/test_800_must_staple.py index 8433ca8cee..d2d8225dd6 100644 --- a/test/modules/md/test_800_must_staple.py +++ b/test/modules/md/test_800_must_staple.py @@ -1,4 +1,5 @@ # test mod_md must-staple support +import time import pytest from .md_conf import MDConf @@ -21,7 +22,8 @@ class TestMustStaple: @pytest.fixture(autouse=True, scope='function') def _method_scope(self, env, request): - self.domain = env.get_class_domain(self.__class__) + env.clear_store() + self.domain = env.get_request_domain(request) def configure_httpd(self, env, domain, add_lines=""): conf = MDConf(env, admin="admin@" + domain) @@ -43,6 +45,7 @@ class TestMustStaple: def test_md_800_002(self, env): self.configure_httpd(env, self.domain, "MDMustStaple off") assert env.apache_restart() == 0, f'{env.apachectl_stderr}' + assert env.await_completion([self.domain]) env.check_md_complete(self.domain) cert1 = MDCertUtil(env.store_domain_file(self.domain, 'pubcert.pem')) assert not cert1.get_must_staple()