]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
tests/krb5: Add method to replace the device SIDs in a PAC
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Thu, 28 Sep 2023 03:12:46 +0000 (16:12 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Thu, 28 Sep 2023 03:33:38 +0000 (03:33 +0000)
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/krb5/kdc_base_test.py

index ce8ff39096d767fa0585e2fd452aa1508c8f0a5c..73ed4c42bed9254d9fa35a3f0a30331ca15d634a 100644 (file)
@@ -1686,6 +1686,145 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
 
         return pac
 
+    # Replace the device SIDs in a PAC with 'new_sids'.
+    def set_pac_device_sids(self,
+                            pac,
+                            *,
+                            new_sids,
+                            domain_sid=None,
+                            user_rid):
+        if domain_sid is None:
+            domain_sid = self.get_samdb().get_domain_sid()
+
+        base_sids = []
+        extra_sids = []
+        resource_sids = []
+
+        primary_gid = None
+
+        # Filter our SIDs into three arrays depending on their ultimate
+        # location in the PAC.
+        for entry in new_sids:
+            if isinstance(entry, frozenset):
+                resource_domain = None
+                domain_sids = []
+
+                for sid, sid_type, attrs in entry:
+                    self.assertIs(sid_type, self.SidType.RESOURCE_SID,
+                                  'only resource SIDs may be specified in this way')
+
+                    if isinstance(sid, int):
+                        domain, rid = domain_sid, sid
+                    else:
+                        domain, rid = sid.rsplit('-', 1)
+                    if resource_domain is None:
+                        resource_domain = domain
+                    else:
+                        self.assertEqual(resource_domain, domain,
+                                         'resource SIDs must share the same '
+                                         'domain')
+
+                    resource_sid = samr.RidWithAttribute()
+                    resource_sid.rid = int(rid)
+                    resource_sid.attributes = attrs
+
+                    domain_sids.append(resource_sid)
+
+                membership = krb5pac.PAC_DOMAIN_GROUP_MEMBERSHIP()
+                if resource_domain is not None:
+                    membership.domain_sid = security.dom_sid(resource_domain)
+                membership.groups.rids = domain_sids
+                membership.groups.count = len(domain_sids)
+
+                resource_sids.append(membership)
+            else:
+                sid, sid_type, attrs = entry
+                if sid_type is self.SidType.BASE_SID:
+                    if isinstance(sid, int):
+                        domain, rid = domain_sid, sid
+                    else:
+                        domain, rid = sid.rsplit('-', 1)
+                    self.assertEqual(domain_sid, domain,
+                                     f'base SID {sid} must be in our domain')
+
+                    base_sid = samr.RidWithAttribute()
+                    base_sid.rid = int(rid)
+                    base_sid.attributes = attrs
+
+                    base_sids.append(base_sid)
+                elif sid_type is self.SidType.EXTRA_SID:
+                    extra_sid = netlogon.netr_SidAttr()
+                    extra_sid.sid = security.dom_sid(sid)
+                    extra_sid.attributes = attrs
+
+                    extra_sids.append(extra_sid)
+                elif sid_type is self.SidType.RESOURCE_SID:
+                    self.fail('specify resource groups in frozenset(s)')
+                elif sid_type is self.SidType.PRIMARY_GID:
+                    self.assertIsNone(primary_gid,
+                                      f'must not specify a second primary GID '
+                                      f'{sid}')
+                    self.assertIsNone(attrs, 'cannot specify primary GID attrs')
+
+                    if isinstance(sid, int):
+                        domain, primary_gid = domain_sid, sid
+                    else:
+                        domain, primary_gid = sid.rsplit('-', 1)
+                    self.assertEqual(domain_sid, domain,
+                                     f'primary GID {sid} must be in our domain')
+                else:
+                    self.fail(f'invalid SID type {sid_type}')
+
+        pac_buffers = pac.buffers
+        for pac_buffer in pac_buffers:
+            # Find the DEVICE_INFO PAC buffer.
+            if pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO:
+                logon_info = pac_buffer.info.info
+                break
+        else:
+            logon_info = krb5pac.PAC_DEVICE_INFO()
+
+            logon_info_ctr = krb5pac.PAC_DEVICE_INFO_CTR()
+            logon_info_ctr.info = logon_info
+
+            pac_buffer = krb5pac.PAC_BUFFER()
+            pac_buffer.type = krb5pac.PAC_TYPE_DEVICE_INFO
+            pac_buffer.info = logon_info_ctr
+
+            pac_buffers.append(pac_buffer)
+
+        logon_info.domain_sid = security.dom_sid(domain_sid)
+        logon_info.rid = int(user_rid)
+
+        self.assertIsNotNone(primary_gid, 'please specify the primary GID')
+        logon_info.primary_gid = int(primary_gid)
+
+        # Add Base SIDs.
+        if base_sids:
+            logon_info.groups.rids = base_sids
+        else:
+            logon_info.groups.rids = None
+        logon_info.groups.count = len(base_sids)
+
+        # Add Extra SIDs.
+        if extra_sids:
+            logon_info.sids = extra_sids
+        else:
+            logon_info.sids = None
+        logon_info.sid_count = len(extra_sids)
+
+        # Add Resource SIDs.
+        if resource_sids:
+            logon_info.domain_groups = resource_sids
+        else:
+            logon_info.domain_groups = None
+        logon_info.domain_group_count = len(resource_sids)
+
+        pac.buffers = pac_buffers
+        pac.num_buffers = len(pac_buffers)
+
+        return pac
+
     def get_cached_creds(self, *,
                          account_type,
                          opts=None,