]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
gpo: Remove sscep depends from Cert Auto Enroll
authorDavid Mulder <dmulder@suse.com>
Wed, 4 May 2022 21:01:22 +0000 (15:01 -0600)
committerAndreas Schneider <asn@cryptomilk.org>
Fri, 13 May 2022 14:46:29 +0000 (14:46 +0000)
Certificate Auto Enrollment currently depends on
sscep to retrieve the root certificate chain.
This isn't necessary, since this can be
accomplished with a simple GET.

Signed-off-by: David Mulder <dmulder@suse.com>
Reviewed-by: Andreas Schneider <asn@samba.org>
python/samba/gp_cert_auto_enroll_ext.py
python/samba/tests/bin/sscep [deleted file]
python/samba/tests/gpo.py

index 7b604e5065d48f2945defa582f24ef0b3da7414a..7cf19b8c839444edc7fa8c26a893196e1390d8ec 100644 (file)
@@ -16,6 +16,7 @@
 
 import os
 import operator
+import requests
 from samba.gpclass import gp_pol_ext
 from samba import Ldb
 from ldb import SCOPE_SUBTREE, SCOPE_BASE
@@ -25,10 +26,19 @@ import base64
 from shutil import which
 from subprocess import Popen, PIPE
 import re
-from glob import glob
 import json
 from samba.gp.util.logging import log
 import struct
+try:
+    from cryptography.hazmat.primitives.serialization.pkcs7 import \
+        load_der_pkcs7_certificates
+except ModuleNotFoundError:
+    def load_der_pkcs7_certificates(x): return []
+    log.error('python cryptography missing pkcs7 support. '
+              'Certificate chain parsing will fail')
+from cryptography.hazmat.primitives.serialization import Encoding
+from cryptography.x509 import load_der_x509_certificate
+from cryptography.hazmat.backends import default_backend
 
 cert_wrap = b"""
 -----BEGIN CERTIFICATE-----
@@ -180,43 +190,72 @@ def get_supported_templates(server):
         return out.strip().split()
     return []
 
+
+def getca(ca_name, url, trust_dir):
+    """Fetch Certificate Chain from the CA."""
+    root_cert = os.path.join(trust_dir, '%s.crt' % ca_name)
+    root_certs = []
+
+    try:
+        r = requests.get(url=url, params={'operation': 'GetCACert',
+                                          'message': 'CAIdentifier'})
+    except requests.exceptions.ConnectionError:
+        log.warn('Failed to establish a new connection')
+        r = None
+    if r is None or r.content == b'':
+        log.warn('Failed to fetch the root certificate chain.')
+        log.warn('Ensure you have installed and configured the'
+                 ' Network Device Enrollment Service.')
+        return root_certs
+
+    if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
+        # Older versions of load_der_x509_certificate require a backend param
+        try:
+            cert = load_der_x509_certificate(r.content)
+        except TypeError:
+            cert = load_der_x509_certificate(r.content, default_backend())
+        cert_data = cert.public_bytes(Encoding.PEM)
+        with open(root_cert, 'wb') as w:
+            w.write(cert_data)
+        root_certs.append(root_cert)
+    elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
+        certs = load_der_pkcs7_certificates(r.content)
+        for i in range(0, len(certs)):
+            cert = certs[i].public_bytes(Encoding.PEM)
+            dest = '%s.%d' % (root_cert, i)
+            with open(dest, 'wb') as w:
+                w.write(cert)
+            root_certs.append(dest)
+    else:
+        log.warn('getca: Wrong (or missing) MIME content type')
+
+    return root_certs
+
+
 def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
     # Install the root certificate chain
     data = {'files': [], 'templates': []}
-    sscep = which('sscep')
-    if sscep is not None:
-        url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % \
-            ca['hostname']
-        root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
-        ret = Popen([sscep, 'getca', '-F', 'sha1', '-c',
-                     root_cert, '-u', url]).wait()
-        if ret != 0:
-            log.warn('sscep failed to fetch the root certificate chain.')
-            log.warn('Ensure you have installed and configured the' +
-                     ' Network Device Enrollment Service.')
-        root_certs = glob('%s*' % root_cert)
-        data['files'].extend(root_certs)
-        for src in root_certs:
-            # Symlink the certs to global trust dir
-            dst = os.path.join(global_trust_dir, os.path.basename(src))
-            try:
-                os.symlink(src, dst)
-                data['files'].append(dst)
-            except PermissionError:
-                log.warn('Failed to symlink root certificate to the' +
-                         ' admin trust anchors')
-            except FileNotFoundError:
-                log.warn('Failed to symlink root certificate to the' +
-                         ' admin trust anchors.' +
-                         ' The directory was not found', global_trust_dir)
-            except FileExistsError:
-                # If we're simply downloading a renewed cert, the symlink
-                # already exists. Ignore the FileExistsError. Preserve the
-                # existing symlink in the unapply data.
-                data['files'].append(dst)
-    else:
-        log.warn('sscep is not installed, which prevents the installation' +
-                 ' of the root certificate chain.')
+    url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
+    root_certs = getca(ca['name'], url, trust_dir)
+    data['files'].extend(root_certs)
+    for src in root_certs:
+        # Symlink the certs to global trust dir
+        dst = os.path.join(global_trust_dir, os.path.basename(src))
+        try:
+            os.symlink(src, dst)
+            data['files'].append(dst)
+        except PermissionError:
+            log.warn('Failed to symlink root certificate to the'
+                     ' admin trust anchors')
+        except FileNotFoundError:
+            log.warn('Failed to symlink root certificate to the'
+                     ' admin trust anchors.'
+                     ' The directory was not found', global_trust_dir)
+        except FileExistsError:
+            # If we're simply downloading a renewed cert, the symlink
+            # already exists. Ignore the FileExistsError. Preserve the
+            # existing symlink in the unapply data.
+            data['files'].append(dst)
     update = which('update-ca-certificates')
     if update is not None:
         Popen([update]).wait()
diff --git a/python/samba/tests/bin/sscep b/python/samba/tests/bin/sscep
deleted file mode 100755 (executable)
index d0d8892..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-#!/usr/bin/python3
-import optparse
-import os, sys, re
-
-sys.path.insert(0, "bin/python")
-
-if __name__ == "__main__":
-    parser = optparse.OptionParser('sscep <cmd> [options]')
-    parser.add_option('-F')
-    parser.add_option('-c')
-    parser.add_option('-u')
-
-    (opts, args) = parser.parse_args()
-    assert len(args) == 1
-    assert args[0] == 'getca'
-    assert opts.F == 'sha1'
-    # Create dummy root cert (empty file)
-    with open(opts.c, 'w') as w:
-        pass
index 7d3cb878b93cc15c615c84463f2013af67513048..e1f81e8a50d5334cd8dc35211b8be7498d2717b2 100644 (file)
@@ -41,8 +41,7 @@ from samba.vgp_motd_ext import vgp_motd_ext
 from samba.vgp_issue_ext import vgp_issue_ext
 from samba.vgp_access_ext import vgp_access_ext
 from samba.gp_gnome_settings_ext import gp_gnome_settings_ext
-from samba.gp_cert_auto_enroll_ext import gp_cert_auto_enroll_ext, \
-                                          octet_string_to_objectGUID
+from samba import gp_cert_auto_enroll_ext as cae
 from samba.gp_firefox_ext import gp_firefox_ext
 from samba.gp_chromium_ext import gp_chromium_ext
 from samba.gp_firewalld_ext import gp_firewalld_ext
@@ -67,6 +66,51 @@ import ldb as _ldb
 from samba.auth import system_session
 import json
 from shutil import which
+import requests
+from cryptography import x509
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.serialization import Encoding
+from datetime import datetime, timedelta
+
+def dummy_certificate():
+    name = x509.Name([
+        x509.NameAttribute(x509.NameOID.COMMON_NAME,
+                           os.environ.get('SERVER'))
+    ])
+    cons = x509.BasicConstraints(ca=True, path_length=0)
+    now = datetime.utcnow()
+
+    key = rsa.generate_private_key(public_exponent=65537, key_size=2048,
+                                   backend=default_backend())
+
+    cert = (
+        x509.CertificateBuilder()
+        .subject_name(name)
+        .issuer_name(name)
+        .public_key(key.public_key())
+        .serial_number(1000)
+        .not_valid_before(now)
+        .not_valid_after(now + timedelta(seconds=300))
+        .add_extension(cons, False)
+        .sign(key, hashes.SHA256(), default_backend())
+    )
+
+    return cert.public_bytes(encoding=Encoding.DER)
+
+# Dummy requests structure for Certificate Auto Enrollment
+class dummy_requests(object):
+    @staticmethod
+    def get(url=None, params=None):
+        dummy = requests.Response()
+        dummy._content = dummy_certificate()
+        dummy.headers = {'Content-Type': 'application/x-x509-ca-cert'}
+        return dummy
+
+    class exceptions(object):
+        ConnectionError = Exception
+cae.requests = dummy_requests
 
 realm = os.environ.get('REALM')
 policies = realm + '/POLICIES'
@@ -8708,8 +8752,8 @@ class GPOTests(tests.TestCase):
         machine_creds.set_machine_account()
 
         # Initialize the group policy extension
-        ext = gp_cert_auto_enroll_ext(self.lp, machine_creds,
-                                      machine_creds.get_username(), store)
+        ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds,
+                                          machine_creds.get_username(), store)
 
         ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
         if ads.connect():
@@ -9069,8 +9113,8 @@ class GPOTests(tests.TestCase):
         machine_creds.set_machine_account()
 
         # Initialize the group policy extension
-        ext = gp_cert_auto_enroll_ext(self.lp, machine_creds,
-                                      machine_creds.get_username(), store)
+        ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds,
+                                          machine_creds.get_username(), store)
 
         ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
         if ads.connect():
@@ -9093,7 +9137,7 @@ class GPOTests(tests.TestCase):
                           _ldb.SCOPE_BASE, '(objectClass=*)', ['objectGUID'])
         self.assertTrue(len(res2) == 1, 'objectGUID not found')
         objectGUID = b'{%s}' % \
-            octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper().encode()
+            cae.octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper().encode()
         parser = GPPolParser()
         parser.load_xml(etree.fromstring(advanced_enroll_reg_pol.strip() % \
             (objectGUID, objectGUID, objectGUID, objectGUID)))