from http.client import HTTPConnection
from urllib.parse import urlparse
+from cryptography.hazmat._oid import ExtensionOID
+from cryptography.hazmat.bindings._rust import ObjectIdentifier
+from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat, NoEncryption, load_pem_private_key
+
from cryptography import x509
+from cryptography.x509 import DNSName, ExtensionNotFound
SEC_PER_DAY = 24 * 60 * 60
class MDCertUtil(object):
# Utility class for inspecting certificates in test cases
- # Uses PyOpenSSL: https://pyopenssl.org/en/stable/index.html
@classmethod
def load_server_cert(cls, host_ip, host_port, host_name, tls=None, ciphers=None):
connection.setblocking(1)
connection.set_tlsext_host_name(host_name.encode('utf-8'))
connection.do_handshake()
- peer_cert = connection.get_peer_certificate()
- return MDCertUtil(None, cert=peer_cert)
+ ossl_cert = connection.get_peer_certificate()
+ return MDCertUtil(None, cert=ossl_cert.to_cryptography())
@classmethod
def parse_pem_cert(cls, text):
- cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, text.encode('utf-8'))
+ cert = x509.load_pem_x509_certificate(text.encode('utf-8'))
return MDCertUtil(None, cert=cert)
@classmethod
return None
def __init__(self, cert_path, cert=None):
+ self.cert = cert
+ self.privkey = None
if cert_path is not None:
self.cert_path = cert_path
# load certificate and private key
if cert_path.startswith("http"):
- cert_data = self.get_plain(cert_path, 1)
- else:
- cert_data = MDCertUtil._load_binary_file(cert_path)
-
- for file_type in (OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1):
- try:
- self.cert = OpenSSL.crypto.load_certificate(file_type, cert_data)
- except Exception as error:
- self.error = error
- if cert is not None:
- self.cert = cert
-
- if self.cert is None:
- raise self.error
+ assert False
+ try:
+ with open(cert_path) as fd:
+ cert = x509.load_pem_x509_certificate("".join(fd.readlines()).encode())
+ except Exception as error:
+ self.error = error
+ if cert is not None:
+ self.cert = cert
+ if self.cert is None:
+ raise self.error
+
+ def add_privkey(self, path, password=None):
+ with open(path) as fd:
+ self.privkey = load_pem_private_key("".join(fd.readlines()).encode(), password=password)
def get_issuer(self):
return self.cert.get_issuer()
def get_serial(self):
# the string representation of a serial number is not unique. Some
# add leading 0s to align with word boundaries.
- return ("%lx" % (self.cert.get_serial_number())).upper()
+ return ("%lx" % (self.cert.serial_number)).upper()
@staticmethod
def _get_serial(cert) -> int:
if isinstance(cert, x509.Certificate):
return cert.serial_number
if isinstance(cert, MDCertUtil):
- return cert.get_serial_number()
- elif isinstance(cert, OpenSSL.crypto.X509):
- return cert.get_serial_number()
+ return cert.cert.serial_number
elif isinstance(cert, str):
# assume a hex number
return int(cert, 16)
elif isinstance(cert, int):
return cert
+ assert False, f'{cert}'
return 0
def get_serial_number(self):
return self._get_serial(self.cert) == self._get_serial(other)
def get_not_before(self):
- tsp = self.cert.get_notBefore()
- return self._parse_tsp(tsp)
+ try:
+ return self.cert.not_valid_before_utc
+ except AttributeError:
+ return self.cert.not_valid_before
def get_not_after(self):
- tsp = self.cert.get_notAfter()
- return self._parse_tsp(tsp)
-
- def get_cn(self):
- return self.cert.get_subject().CN
+ try:
+ return self.cert.not_valid_after_utc
+ except AttributeError:
+ return self.cert.not_valid_after
def get_key_length(self):
- return self.cert.get_pubkey().bits()
+ return self.cert.public_key().key_size
def get_san_list(self):
- text = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_TEXT, self.cert).decode("utf-8")
- m = re.search(r"X509v3 Subject Alternative Name:(\s+critical)?\s*(.*)", text)
- sans_list = []
- if m:
- sans_list = m.group(2).split(",")
-
- def _strip_prefix(s):
- return s.split(":")[1] if s.strip().startswith("DNS:") else s.strip()
- return list(map(_strip_prefix, sans_list))
+ sans = self.cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
+ return sans.value.get_values_for_type(DNSName)
def get_must_staple(self):
- text = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_TEXT, self.cert).decode("utf-8")
- m = re.search(r"1.3.6.1.5.5.7.1.24:\s*\n\s*0....", text)
- if not m:
- # Newer openssl versions print this differently
- m = re.search(r"TLS Feature:\s*\n\s*status_request\s*\n", text)
- return m is not None
+ try:
+ self.cert.extensions.get_extension_for_oid(ExtensionOID.TLS_FEATURE)
+ return True
+ except ExtensionNotFound:
+ return False
@classmethod
def validate_privkey(cls, privkey_path, passphrase=None):
- privkey_data = cls._load_binary_file(privkey_path)
- if passphrase:
- privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey_data, passphrase)
- else:
- privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey_data)
- return privkey.check()
-
- def validate_cert_matches_priv_key(self, privkey_path):
- # Verifies that the private key and cert match.
- privkey_data = MDCertUtil._load_binary_file(privkey_path)
- privkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey_data)
- context = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
- context.use_privatekey(privkey)
- context.use_certificate(self.cert)
- context.check_privatekey()
-
- # --------- _utils_ ---------
-
- def astr(self, s):
- return s.decode('utf-8')
-
- def _parse_tsp(self, tsp):
- # timestampss returned by PyOpenSSL are bytes
- # parse date and time part
- s = ("%s-%s-%s %s:%s:%s" % (self.astr(tsp[0:4]), self.astr(tsp[4:6]), self.astr(tsp[6:8]),
- self.astr(tsp[8:10]), self.astr(tsp[10:12]), self.astr(tsp[12:14])))
- timestamp = datetime.strptime(s, '%Y-%m-%d %H:%M:%S')
- # adjust timezone
- tz_h, tz_m = 0, 0
- m = re.match(r"([+\-]\d{2})(\d{2})", self.astr(tsp[14:]))
- if m:
- tz_h, tz_m = int(m.group(1)), int(m.group(2)) if tz_h > 0 else -1 * int(m.group(2))
- return timestamp.replace(tzinfo=self.FixedOffset(60 * tz_h + tz_m))
-
- @classmethod
- def _load_binary_file(cls, path):
- with open(path, mode="rb") as file:
- return file.read()
-
- class FixedOffset(tzinfo):
-
- def __init__(self, offset):
- self.__offset = timedelta(minutes=offset)
-
- def utcoffset(self, dt):
- return self.__offset
-
- def tzname(self, dt):
- return None
-
- def dst(self, dt):
- return timedelta(0)
+ with open(privkey_path) as fd:
+ privkey = load_pem_private_key("".join(fd.readlines()).encode(), password=passphrase)
+ return privkey is not None