]> git.ipfire.org Git - thirdparty/apache/httpd.git/commitdiff
pytest mod_md: fix python crypto deprecations.
authorRainer Jung <rjung@apache.org>
Tue, 16 Jun 2026 19:17:22 +0000 (19:17 +0000)
committerRainer Jung <rjung@apache.org>
Tue, 16 Jun 2026 19:17:22 +0000 (19:17 +0000)
Use the modern crypto API instead of OpenSSL things.

Merge from icing/md:
https://github.com/icing/mod_md/commit/41496f9a7333c4e0c46b54ab08f04be8433caafd

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1935439 13f79535-47bb-0310-9956-ffa450edef68

test/modules/md/md_cert_util.py
test/modules/md/md_env.py
test/modules/md/test_502_acmev2_drive.py
test/modules/md/test_702_auto.py
test/modules/md/test_800_must_staple.py

index 6cd034a02b5825efe884a709724e9adf79466dd4..796af220dfb8a48cc518a76bf94c84dda2e9d085 100755 (executable)
@@ -11,7 +11,12 @@ from datetime import timedelta
 from http.client import HTTPConnection
 from urllib.parse import urlparse
 
+from cryptography.hazmat._oid import ExtensionOID
+from cryptography.hazmat.bindings._rust import ObjectIdentifier
+from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, load_pem_private_key
+
 from cryptography import x509
+from cryptography.x509 import DNSName, ExtensionNotFound
 
 SEC_PER_DAY = 24 * 60 * 60
 
@@ -21,7 +26,6 @@ log = logging.getLogger(__name__)
 
 class MDCertUtil(object):
     # Utility class for inspecting certificates in test cases
-    # Uses PyOpenSSL: https://pyopenssl.org/en/stable/index.html
 
     @classmethod
     def load_server_cert(cls, host_ip, host_port, host_name, tls=None, ciphers=None):
@@ -42,12 +46,12 @@ class MDCertUtil(object):
         connection.setblocking(1)
         connection.set_tlsext_host_name(host_name.encode('utf-8'))
         connection.do_handshake()
-        peer_cert = connection.get_peer_certificate()
-        return MDCertUtil(None, cert=peer_cert)
+        ossl_cert = connection.get_peer_certificate()
+        return MDCertUtil(None, cert=ossl_cert.to_cryptography())
 
     @classmethod
     def parse_pem_cert(cls, text):
-        cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, text.encode('utf-8'))
+        cert = x509.load_pem_x509_certificate(text.encode('utf-8'))
         return MDCertUtil(None, cert=cert)
 
     @classmethod
@@ -72,24 +76,26 @@ class MDCertUtil(object):
         return None
 
     def __init__(self, cert_path, cert=None):
+        self.cert = cert
+        self.privkey = None
         if cert_path is not None:
             self.cert_path = cert_path
             # load certificate and private key
             if cert_path.startswith("http"):
-                cert_data = self.get_plain(cert_path, 1)
-            else:
-                cert_data = MDCertUtil._load_binary_file(cert_path)
-
-            for file_type in (OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1):
-                try:
-                    self.cert = OpenSSL.crypto.load_certificate(file_type, cert_data)
-                except Exception as error:
-                    self.error = error
-        if cert is not None:
-            self.cert = cert
-
-        if self.cert is None:
-            raise self.error
+                assert False
+            try:
+                with open(cert_path) as fd:
+                    cert = x509.load_pem_x509_certificate("".join(fd.readlines()).encode())
+            except Exception as error:
+                self.error = error
+            if cert is not None:
+                self.cert = cert
+            if self.cert is None:
+                raise self.error
+
+    def add_privkey(self, path, password=None):
+        with open(path) as fd:
+            self.privkey = load_pem_private_key("".join(fd.readlines()).encode(), password=password)
 
     def get_issuer(self):
         return self.cert.get_issuer()
@@ -97,21 +103,20 @@ class MDCertUtil(object):
     def get_serial(self):
         # the string representation of a serial number is not unique. Some
         # add leading 0s to align with word boundaries.
-        return ("%lx" % (self.cert.get_serial_number())).upper()
+        return ("%lx" % (self.cert.serial_number)).upper()
 
     @staticmethod
     def _get_serial(cert) -> int:
         if isinstance(cert, x509.Certificate):
             return cert.serial_number
         if isinstance(cert, MDCertUtil):
-            return cert.get_serial_number()
-        elif isinstance(cert, OpenSSL.crypto.X509):
-            return cert.get_serial_number()
+            return cert.cert.serial_number
         elif isinstance(cert, str):
             # assume a hex number
             return int(cert, 16)
         elif isinstance(cert, int):
             return cert
+        assert False, f'{cert}'
         return 0
 
     def get_serial_number(self):
@@ -121,89 +126,33 @@ class MDCertUtil(object):
         return self._get_serial(self.cert) == self._get_serial(other)
 
     def get_not_before(self):
-        tsp = self.cert.get_notBefore()
-        return self._parse_tsp(tsp)
+        try:
+            return self.cert.not_valid_before_utc
+        except AttributeError:
+            return self.cert.not_valid_before
 
     def get_not_after(self):
-        tsp = self.cert.get_notAfter()
-        return self._parse_tsp(tsp)
-
-    def get_cn(self):
-        return self.cert.get_subject().CN
+        try:
+            return self.cert.not_valid_after_utc
+        except AttributeError:
+            return self.cert.not_valid_after
 
     def get_key_length(self):
-        return self.cert.get_pubkey().bits()
+        return self.cert.public_key().key_size
 
     def get_san_list(self):
-        text = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_TEXT, self.cert).decode("utf-8")
-        m = re.search(r"X509v3 Subject Alternative Name:(\s+critical)?\s*(.*)", text)
-        sans_list = []
-        if m:
-            sans_list = m.group(2).split(",")
-
-        def _strip_prefix(s):
-            return s.split(":")[1] if s.strip().startswith("DNS:") else s.strip()
-        return list(map(_strip_prefix, sans_list))
+        sans = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
+        return sans.value.get_values_for_type(DNSName)
 
     def get_must_staple(self):
-        text = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_TEXT, self.cert).decode("utf-8")
-        m = re.search(r"1.3.6.1.5.5.7.1.24:\s*\n\s*0....", text)
-        if not m:
-            # Newer openssl versions print this differently
-            m = re.search(r"TLS Feature:\s*\n\s*status_request\s*\n", text)
-        return m is not None
+        try:
+            self.cert.extensions.get_extension_for_oid(ExtensionOID.TLS_FEATURE)
+            return True
+        except ExtensionNotFound:
+            return False
 
     @classmethod
     def validate_privkey(cls, privkey_path, passphrase=None):
-        privkey_data = cls._load_binary_file(privkey_path)
-        if passphrase:
-            privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey_data, passphrase)
-        else:
-            privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey_data)
-        return privkey.check()
-
-    def validate_cert_matches_priv_key(self, privkey_path):
-        # Verifies that the private key and cert match.
-        privkey_data = MDCertUtil._load_binary_file(privkey_path)
-        privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey_data)
-        context = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
-        context.use_privatekey(privkey)
-        context.use_certificate(self.cert)
-        context.check_privatekey()
-
-    # --------- _utils_ ---------
-
-    def astr(self, s):
-        return s.decode('utf-8')
-        
-    def _parse_tsp(self, tsp):
-        # timestampss returned by PyOpenSSL are bytes
-        # parse date and time part
-        s = ("%s-%s-%s %s:%s:%s" % (self.astr(tsp[0:4]), self.astr(tsp[4:6]), self.astr(tsp[6:8]),
-                                    self.astr(tsp[8:10]), self.astr(tsp[10:12]), self.astr(tsp[12:14])))
-        timestamp = datetime.strptime(s, '%Y-%m-%d %H:%M:%S')
-        # adjust timezone
-        tz_h, tz_m = 0, 0
-        m = re.match(r"([+\-]\d{2})(\d{2})", self.astr(tsp[14:]))
-        if m:
-            tz_h, tz_m = int(m.group(1)),  int(m.group(2)) if tz_h > 0 else -1 * int(m.group(2))
-        return timestamp.replace(tzinfo=self.FixedOffset(60 * tz_h + tz_m))
-
-    @classmethod
-    def _load_binary_file(cls, path):
-        with open(path, mode="rb") as file:
-            return file.read()
-
-    class FixedOffset(tzinfo):
-
-        def __init__(self, offset):
-            self.__offset = timedelta(minutes=offset)
-
-        def utcoffset(self, dt):
-            return self.__offset
-
-        def tzname(self, dt):
-            return None
-
-        def dst(self, dt):
-            return timedelta(0)
+        with open(privkey_path) as fd:
+            privkey = load_pem_private_key("".join(fd.readlines()).encode(), password=passphrase)
+            return privkey is not None
index acc8417b149ec4160e111396ce9f97ba21307e1d..7b5f0f9e5618f0f2cbb19988ae49b9fcebda319a 100755 (executable)
@@ -340,7 +340,7 @@ class MDTestEnv(HttpdTestEnv):
         md = self.get_md_status(domain)
         assert md
         assert 'state' in md, "md is unexpected: {0}".format(md)
-        assert md['state'] is MDTestEnv.MD_S_COMPLETE, f"unexpected state: {md['state']}"
+        assert md['state'] is MDTestEnv.MD_S_COMPLETE, f"unexpected state: {md}"
         pkey_file = self.store_domain_file(domain, self.pkey_fname(pkey))
         cert_file = self.store_domain_file(domain, self.cert_fname(pkey))
         r = self.run(['ls', os.path.dirname(pkey_file)])
@@ -359,7 +359,7 @@ class MDTestEnv(HttpdTestEnv):
         # check private key, validate certificate, etc
         MDCertUtil.validate_privkey(self.store_domain_file(domain, 'privkey.pem'))
         cert = MDCertUtil(self.store_domain_file(domain, 'pubcert.pem'))
-        cert.validate_cert_matches_priv_key(self.store_domain_file(domain, 'privkey.pem'))
+        cert.add_privkey(self.store_domain_file(domain, 'privkey.pem'))
         # No longer check CN, it may not be set or is not trusted anyway
         # assert cert.get_cn() == domain, f'CN: expected "{domain}", got {cert.get_cn()}'
         # check SANs
index 484e4d4bd95d2a6cf3ce24bbe7ee3f35661dc053..c3a6854b2be20f9291bb237f4604a45fa2700147 100644 (file)
@@ -395,7 +395,7 @@ class TestDrivev2:
         # check new cert
         env.check_md_credentials([name, "test." + domain])
         new_cert = MDCertUtil(env.store_domain_file(name, 'pubcert.pem'))
-        assert not old_cert.same_serial_as(new_cert.get_serial)
+        assert not old_cert.same_serial_as(new_cert.get_serial())
 
     @pytest.mark.parametrize("renew_window,test_data_list", [
         ("14d", [
@@ -550,4 +550,4 @@ class TestDrivev2:
         # check: key file is encrypted PEM
         md = env.a2md(["list", name]).json['output'][0]
         acc = md['ca']['account']
-        MDCertUtil.validate_privkey(env.path_account_key(acc), lambda *args: encrypt_key)
+        MDCertUtil.validate_privkey(env.path_account_key(acc), encrypt_key)
index 1536d39624fc87d6647939e5142fbb5d87ea6137..b4d7a0b05178a142075a80dface5bc5a1f39b85b 100644 (file)
@@ -208,8 +208,8 @@ class TestAutov2:
         # check temporary cert from server
         cert2 = MDCertUtil(env.path_fallback_cert(domain))
         assert cert1.same_serial_as(cert2), \
-            "Unexpected temporary certificate on vhost %s. Expected cn: %s , "\
-            "but found cn: %s" % (name_a, cert2.get_cn(), cert1.get_cn())
+            f"Unexpected temporary certificate on vhost {name_a}." \
+            f" Expected cn: {cert2}, but found cn: {cert1}"
 
     # test case: drive MD with only invalid challenges, domains should stay 503'd
     def test_md_702_006(self, env):
index 8433ca8cee5a0c6435962189cd8aa6f2d96cbcad..d2d8225dd62f6c34692525df2d47705e182aa9e8 100644 (file)
@@ -1,4 +1,5 @@
 # test mod_md must-staple support
+import time
 import pytest
 
 from .md_conf import MDConf
@@ -21,7 +22,8 @@ class TestMustStaple:
 
     @pytest.fixture(autouse=True, scope='function')
     def _method_scope(self, env, request):
-        self.domain = env.get_class_domain(self.__class__)
+        env.clear_store()
+        self.domain = env.get_request_domain(request)
 
     def configure_httpd(self, env, domain, add_lines=""):
         conf = MDConf(env, admin="admin@" + domain)
@@ -43,6 +45,7 @@ class TestMustStaple:
     def test_md_800_002(self, env):
         self.configure_httpd(env, self.domain, "MDMustStaple off")
         assert env.apache_restart() == 0, f'{env.apachectl_stderr}'
+        assert env.await_completion([self.domain])
         env.check_md_complete(self.domain)
         cert1 = MDCertUtil(env.store_domain_file(self.domain, 'pubcert.pem'))
         assert not cert1.get_must_staple()