From: David Mulder Date: Wed, 4 May 2022 21:01:22 +0000 (-0600) Subject: gpo: Remove sscep depends from Cert Auto Enroll X-Git-Tag: talloc-2.3.4~144 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d3e0eec03cd93dcceaec7328ba8252bfa78f968e;p=thirdparty%2Fsamba.git gpo: Remove sscep depends from Cert Auto Enroll Certificate Auto Enrollment currently depends on sscep to retrieve the root certificate chain. This isn't necessary, since this can be accomplished with a simple GET. Signed-off-by: David Mulder Reviewed-by: Andreas Schneider --- diff --git a/python/samba/gp_cert_auto_enroll_ext.py b/python/samba/gp_cert_auto_enroll_ext.py index 7b604e5065d..7cf19b8c839 100644 --- a/python/samba/gp_cert_auto_enroll_ext.py +++ b/python/samba/gp_cert_auto_enroll_ext.py @@ -16,6 +16,7 @@ import os import operator +import requests from samba.gpclass import gp_pol_ext from samba import Ldb from ldb import SCOPE_SUBTREE, SCOPE_BASE @@ -25,10 +26,19 @@ import base64 from shutil import which from subprocess import Popen, PIPE import re -from glob import glob import json from samba.gp.util.logging import log import struct +try: + from cryptography.hazmat.primitives.serialization.pkcs7 import \ + load_der_pkcs7_certificates +except ModuleNotFoundError: + def load_der_pkcs7_certificates(x): return [] + log.error('python cryptography missing pkcs7 support. ' + 'Certificate chain parsing will fail') +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.x509 import load_der_x509_certificate +from cryptography.hazmat.backends import default_backend cert_wrap = b""" -----BEGIN CERTIFICATE----- @@ -180,43 +190,72 @@ def get_supported_templates(server): return out.strip().split() return [] + +def getca(ca_name, url, trust_dir): + """Fetch Certificate Chain from the CA.""" + root_cert = os.path.join(trust_dir, '%s.crt' % ca_name) + root_certs = [] + + try: + r = requests.get(url=url, params={'operation': 'GetCACert', + 'message': 'CAIdentifier'}) + except requests.exceptions.ConnectionError: + log.warn('Failed to establish a new connection') + r = None + if r is None or r.content == b'': + log.warn('Failed to fetch the root certificate chain.') + log.warn('Ensure you have installed and configured the' + ' Network Device Enrollment Service.') + return root_certs + + if r.headers['Content-Type'] == 'application/x-x509-ca-cert': + # Older versions of load_der_x509_certificate require a backend param + try: + cert = load_der_x509_certificate(r.content) + except TypeError: + cert = load_der_x509_certificate(r.content, default_backend()) + cert_data = cert.public_bytes(Encoding.PEM) + with open(root_cert, 'wb') as w: + w.write(cert_data) + root_certs.append(root_cert) + elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert': + certs = load_der_pkcs7_certificates(r.content) + for i in range(0, len(certs)): + cert = certs[i].public_bytes(Encoding.PEM) + dest = '%s.%d' % (root_cert, i) + with open(dest, 'wb') as w: + w.write(cert) + root_certs.append(dest) + else: + log.warn('getca: Wrong (or missing) MIME content type') + + return root_certs + + def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'): # Install the root certificate chain data = {'files': [], 'templates': []} - sscep = which('sscep') - if sscep is not None: - url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % \ - ca['hostname'] - root_cert = os.path.join(trust_dir, '%s.crt' % ca['name']) - ret = Popen([sscep, 'getca', '-F', 'sha1', '-c', - root_cert, '-u', url]).wait() - if ret != 0: - log.warn('sscep failed to fetch the root certificate chain.') - log.warn('Ensure you have installed and configured the' + - ' Network Device Enrollment Service.') - root_certs = glob('%s*' % root_cert) - data['files'].extend(root_certs) - for src in root_certs: - # Symlink the certs to global trust dir - dst = os.path.join(global_trust_dir, os.path.basename(src)) - try: - os.symlink(src, dst) - data['files'].append(dst) - except PermissionError: - log.warn('Failed to symlink root certificate to the' + - ' admin trust anchors') - except FileNotFoundError: - log.warn('Failed to symlink root certificate to the' + - ' admin trust anchors.' + - ' The directory was not found', global_trust_dir) - except FileExistsError: - # If we're simply downloading a renewed cert, the symlink - # already exists. Ignore the FileExistsError. Preserve the - # existing symlink in the unapply data. - data['files'].append(dst) - else: - log.warn('sscep is not installed, which prevents the installation' + - ' of the root certificate chain.') + url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname'] + root_certs = getca(ca['name'], url, trust_dir) + data['files'].extend(root_certs) + for src in root_certs: + # Symlink the certs to global trust dir + dst = os.path.join(global_trust_dir, os.path.basename(src)) + try: + os.symlink(src, dst) + data['files'].append(dst) + except PermissionError: + log.warn('Failed to symlink root certificate to the' + ' admin trust anchors') + except FileNotFoundError: + log.warn('Failed to symlink root certificate to the' + ' admin trust anchors.' + ' The directory was not found', global_trust_dir) + except FileExistsError: + # If we're simply downloading a renewed cert, the symlink + # already exists. Ignore the FileExistsError. Preserve the + # existing symlink in the unapply data. + data['files'].append(dst) update = which('update-ca-certificates') if update is not None: Popen([update]).wait() diff --git a/python/samba/tests/bin/sscep b/python/samba/tests/bin/sscep deleted file mode 100755 index d0d88926766..00000000000 --- a/python/samba/tests/bin/sscep +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/python3 -import optparse -import os, sys, re - -sys.path.insert(0, "bin/python") - -if __name__ == "__main__": - parser = optparse.OptionParser('sscep [options]') - parser.add_option('-F') - parser.add_option('-c') - parser.add_option('-u') - - (opts, args) = parser.parse_args() - assert len(args) == 1 - assert args[0] == 'getca' - assert opts.F == 'sha1' - # Create dummy root cert (empty file) - with open(opts.c, 'w') as w: - pass diff --git a/python/samba/tests/gpo.py b/python/samba/tests/gpo.py index 7d3cb878b93..e1f81e8a50d 100644 --- a/python/samba/tests/gpo.py +++ b/python/samba/tests/gpo.py @@ -41,8 +41,7 @@ from samba.vgp_motd_ext import vgp_motd_ext from samba.vgp_issue_ext import vgp_issue_ext from samba.vgp_access_ext import vgp_access_ext from samba.gp_gnome_settings_ext import gp_gnome_settings_ext -from samba.gp_cert_auto_enroll_ext import gp_cert_auto_enroll_ext, \ - octet_string_to_objectGUID +from samba import gp_cert_auto_enroll_ext as cae from samba.gp_firefox_ext import gp_firefox_ext from samba.gp_chromium_ext import gp_chromium_ext from samba.gp_firewalld_ext import gp_firewalld_ext @@ -67,6 +66,51 @@ import ldb as _ldb from samba.auth import system_session import json from shutil import which +import requests +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.serialization import Encoding +from datetime import datetime, timedelta + +def dummy_certificate(): + name = x509.Name([ + x509.NameAttribute(x509.NameOID.COMMON_NAME, + os.environ.get('SERVER')) + ]) + cons = x509.BasicConstraints(ca=True, path_length=0) + now = datetime.utcnow() + + key = rsa.generate_private_key(public_exponent=65537, key_size=2048, + backend=default_backend()) + + cert = ( + x509.CertificateBuilder() + .subject_name(name) + .issuer_name(name) + .public_key(key.public_key()) + .serial_number(1000) + .not_valid_before(now) + .not_valid_after(now + timedelta(seconds=300)) + .add_extension(cons, False) + .sign(key, hashes.SHA256(), default_backend()) + ) + + return cert.public_bytes(encoding=Encoding.DER) + +# Dummy requests structure for Certificate Auto Enrollment +class dummy_requests(object): + @staticmethod + def get(url=None, params=None): + dummy = requests.Response() + dummy._content = dummy_certificate() + dummy.headers = {'Content-Type': 'application/x-x509-ca-cert'} + return dummy + + class exceptions(object): + ConnectionError = Exception +cae.requests = dummy_requests realm = os.environ.get('REALM') policies = realm + '/POLICIES' @@ -8708,8 +8752,8 @@ class GPOTests(tests.TestCase): machine_creds.set_machine_account() # Initialize the group policy extension - ext = gp_cert_auto_enroll_ext(self.lp, machine_creds, - machine_creds.get_username(), store) + ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds, + machine_creds.get_username(), store) ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds) if ads.connect(): @@ -9069,8 +9113,8 @@ class GPOTests(tests.TestCase): machine_creds.set_machine_account() # Initialize the group policy extension - ext = gp_cert_auto_enroll_ext(self.lp, machine_creds, - machine_creds.get_username(), store) + ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds, + machine_creds.get_username(), store) ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds) if ads.connect(): @@ -9093,7 +9137,7 @@ class GPOTests(tests.TestCase): _ldb.SCOPE_BASE, '(objectClass=*)', ['objectGUID']) self.assertTrue(len(res2) == 1, 'objectGUID not found') objectGUID = b'{%s}' % \ - octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper().encode() + cae.octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper().encode() parser = GPPolParser() parser.load_xml(etree.fromstring(advanced_enroll_reg_pol.strip() % \ (objectGUID, objectGUID, objectGUID, objectGUID)))