]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
gpo: Add Cert Auto Enroll Advanced Config
authorDavid Mulder <dmulder@suse.com>
Tue, 12 Apr 2022 18:50:25 +0000 (12:50 -0600)
committerJeremy Allison <jra@samba.org>
Tue, 3 May 2022 21:48:57 +0000 (21:48 +0000)
Advanced configuration for Certifcate Auto
Enrollment is stored on the sysvol, and needs
to be parsed/used when provided.

Signed-off-by: David Mulder <dmulder@suse.com>
Reviewed-by: Jeremy Allison <jra@samba.org>
Autobuild-User(master): Jeremy Allison <jra@samba.org>
Autobuild-Date(master): Tue May  3 21:48:57 UTC 2022 on sn-devel-184

python/samba/gp_cert_auto_enroll_ext.py
selftest/knownfail.d/gpo [deleted file]

index 45ef4de8047789326d0b56ae8f18f79eef658834..b2903f3b7e0eaa82b7ebb2760dab3dc2ae15a786 100644 (file)
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import os
+import operator
 from samba.gpclass import gp_pol_ext
 from samba import Ldb
-from ldb import SCOPE_SUBTREE
+from ldb import SCOPE_SUBTREE, SCOPE_BASE
 from samba.auth import system_session
 from samba.gpclass import get_dc_hostname
 import base64
@@ -35,6 +36,8 @@ cert_wrap = b"""
 %s
 -----END CERTIFICATE-----"""
 global_trust_dir = '/etc/pki/trust/anchors'
+endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \
+              '_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP'
 
 def octet_string_to_objectGUID(data):
     return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0],
@@ -43,6 +46,84 @@ def octet_string_to_objectGUID(data):
                                '%02x' % struct.unpack('>H', data[8:10])[0],
                                '%02x%02x' % struct.unpack('>HL', data[10:]))
 
+'''
+Group and Sort End Point Information
+[MS-CAESO] 4.4.5.3.2.3
+In this step autoenrollment processes the end point information by grouping it
+by CEP ID and sorting in the order with which it will use the end point to
+access the CEP information.
+'''
+def group_and_sort_end_point_information(end_point_information):
+    # Create groups of the CertificateEnrollmentPolicyEndPoint instances that
+    # have the same value of the EndPoint.PolicyID datum.
+    end_point_groups = {}
+    for e in end_point_information:
+        if e['PolicyID'] not in end_point_groups.keys():
+            end_point_groups[e['PolicyID']] = []
+        end_point_groups[e['PolicyID']].append(e)
+
+    # Sort each group by following these rules:
+    for end_point_group in end_point_groups.values():
+        # Sort the CertificateEnrollmentPolicyEndPoint instances in ascending
+        # order based on the EndPoint.Cost value.
+        end_point_group.sort(key=lambda e: e['Cost'])
+
+        # For instances that have the same EndPoint.Cost:
+        cost_list = [e['Cost'] for e in end_point_group]
+        costs = set(cost_list)
+        for cost in costs:
+            i = cost_list.index(cost)
+            j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1
+            if i == j:
+                continue
+
+            # Sort those that have EndPoint.Authentication equal to Kerberos
+            # first. Then sort those that have EndPoint.Authentication equal to
+            # Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint
+            # instances follow in an arbitrary order.
+            def sort_auth(e):
+                # 0x2 - Kerberos
+                if e['AuthFlags'] == 0x2:
+                    return 0
+                # 0x1 - Anonymous
+                elif e['AuthFlags'] == 0x1:
+                    return 1
+                else:
+                    return 2
+            end_point_group[i:j+1] = sorted(end_point_group[i:j+1],
+                                            key=sort_auth)
+    return list(end_point_groups.values())
+
+'''
+Obtaining End Point Information
+[MS-CAESO] 4.4.5.3.2.2
+In this step autoenrollment initializes the
+CertificateEnrollmentPolicyEndPoints table.
+'''
+def obtain_end_point_information(entries):
+    end_point_information = {}
+    section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\'
+    for e in entries:
+        if not e.keyname.startswith(section):
+            continue
+        name = e.keyname.replace(section, '')
+        if name not in end_point_information.keys():
+            end_point_information[name] = {}
+        end_point_information[name][e.valuename] = e.data
+    for ca in end_point_information.values():
+        m = re.match(endpoint_re, ca['URL'])
+        if m:
+            name = '%s-CA' % m.group('server').replace('.', '-')
+            ca['name'] = name
+            ca['hostname'] = m.group('server')
+            ca['auth'] = m.group('auth')
+        else:
+            edata = { 'endpoint': ca['URL'] }
+            log.error('Failed to parse the endpoint', edata)
+    end_point_information = \
+        group_and_sort_end_point_information(end_point_information.values())
+    return end_point_information
+
 '''
 Initializing CAs
 [MS-CAESO] 4.4.5.3.1.2
@@ -228,18 +309,96 @@ class gp_cert_auto_enroll_ext(gp_pol_ext):
                         manage = e.data & 0x2 == 1
                         retrive_pending = e.data & 0x4 == 1
                         if enroll:
-                            url = 'ldap://%s' % get_dc_hostname(self.creds,
-                                                                self.lp)
-                            ldb = Ldb(url=url, session_info=system_session(),
-                                      lp=self.lp, credentials=self.creds)
-                            cas = fetch_certification_authorities(ldb)
-                            for ca in cas:
-                                data = cert_enroll(ca, ldb, trust_dir, private_dir)
-                                self.gp_db.store(str(self),
-                                     base64.b64encode(ca['name']).decode(),
-                                     data)
+                            self.__enroll(pol_conf.entries, trust_dir,
+                                          private_dir)
                         self.gp_db.commit()
 
+    '''
+    Read CEP Data
+    [MS-CAESO] 4.4.5.3.2.4
+    In this step autoenrollment initializes instances of the
+    CertificateEnrollmentPolicy by accessing end points associated with CEP
+    groups created in the previous step.
+    '''
+    def __read_cep_data(self, ldb, end_point_information,
+                        trust_dir, private_dir):
+        # For each group created in the previous step:
+        for end_point_group in end_point_information:
+            # Pick an arbitrary instance of the
+            # CertificateEnrollmentPolicyEndPoint from the group
+            e = end_point_group[0]
+
+            # If this instance does not have the AutoEnrollmentEnabled flag set
+            # in the EndPoint.Flags, continue with the next group.
+            if not e['Flags'] & 0x10:
+                continue
+
+            # If the current group contains a
+            # CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI
+            # equal to "LDAP":
+            if any([e['URL'] == 'LDAP:' for e in end_point_group]):
+                # Perform an LDAP search to read the value of the objectGuid
+                # attribute of the root object of the forest root domain NC. If
+                # any errors are encountered, continue with the next group.
+                res = ldb.search('', SCOPE_BASE, '(objectClass=*)',
+                                 ['rootDomainNamingContext'])
+                if len(res) != 1:
+                    continue
+                res2 = ldb.search(res[0]['rootDomainNamingContext'][0],
+                                  SCOPE_BASE, '(objectClass=*)',
+                                  ['objectGUID'])
+                if len(res2) != 1:
+                    continue
+
+                # Compare the value read in the previous step to the
+                # EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint
+                # instance. If the values do not match, continue with the next
+                # group.
+                objectGUID = '{%s}' % \
+                    octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper()
+                if objectGUID != e['PolicyID']:
+                    continue
+
+            # For each CertificateEnrollmentPolicyEndPoint instance for that
+            # group:
+            for ca in end_point_group:
+                # If EndPoint.URI equals "LDAP":
+                if ca['URL'] == 'LDAP:':
+                    # This is a basic configuration.
+                    cas = fetch_certification_authorities(ldb)
+                    for ca in cas:
+                        data = cert_enroll(ca, ldb, trust_dir, private_dir)
+                        self.gp_db.store(str(self),
+                                         base64.b64encode(ca['name']).decode(),
+                                         data)
+                # If EndPoint.URI starts with "HTTPS//":
+                elif ca['URL'].lower().startswith('https://'):
+                    data = cert_enroll(ca, ldb, trust_dir,
+                                       private_dir, auth=ca['auth'])
+                    self.gp_db.store(str(self),
+                        base64.b64encode(ca['name'].encode()).decode(),
+                        data)
+                else:
+                    edata = { 'endpoint': ca['URL'] }
+                    log.error('Unrecognized endpoint', edata)
+
+    def __enroll(self, entries, trust_dir, private_dir):
+        url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
+        ldb = Ldb(url=url, session_info=system_session(),
+                  lp=self.lp, credentials=self.creds)
+
+        end_point_information = obtain_end_point_information(entries)
+        if len(end_point_information) > 0:
+            for end_point_group in end_point_information:
+                self.__read_cep_data(ldb, end_point_information,
+                                     trust_dir, private_dir)
+        else:
+            cas = fetch_certification_authorities(ldb)
+            for ca in cas:
+                data = cert_enroll(ca, ldb, trust_dir, private_dir)
+                self.gp_db.store(str(self),
+                                 base64.b64encode(ca['name']).decode(), data)
+
     def rsop(self, gpo):
         output = {}
         pol_file = 'MACHINE/Registry.pol'
@@ -258,15 +417,26 @@ class gp_cert_auto_enroll_ext(gp_pol_ext):
                     url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
                     ldb = Ldb(url=url, session_info=system_session(),
                               lp=self.lp, credentials=self.creds)
+                    end_point_information = \
+                        obtain_end_point_information(pol_conf.entries)
                     cas = fetch_certification_authorities(ldb)
+                    if len(end_point_information) > 0:
+                        cas2 = [ep for sl in end_point_information for ep in sl]
+                        if any([ca['URL'] == 'LDAP:' for ca in cas2]):
+                            cas.extend(cas2)
+                        else:
+                            cas = cas2
                     for ca in cas:
+                        if 'URL' in ca and ca['URL'] == 'LDAP:':
+                            continue
                         policy = 'Auto Enrollment Policy'
                         cn = ca['name']
                         if policy not in output:
                             output[policy] = {}
                         output[policy][cn] = {}
-                        output[policy][cn]['CA Certificate'] = \
-                            format_root_cert(ca['cACertificate']).decode()
+                        if 'cACertificate' in ca:
+                            output[policy][cn]['CA Certificate'] = \
+                                format_root_cert(ca['cACertificate']).decode()
                         output[policy][cn]['Auto Enrollment Server'] = \
                             ca['hostname']
                         supported_templates = \
diff --git a/selftest/knownfail.d/gpo b/selftest/knownfail.d/gpo
deleted file mode 100644 (file)
index 5961cac..0000000
+++ /dev/null
@@ -1 +0,0 @@
-samba.tests.gpo.samba.tests.gpo.GPOTests.test_advanced_gp_cert_auto_enroll_ext