From: Joseph Sutton Date: Thu, 28 Sep 2023 03:12:46 +0000 (+1300) Subject: tests/krb5: Add method to replace the device SIDs in a PAC X-Git-Tag: tevent-0.16.0~329 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6f5368dd32689019fff8071ec4601971712dd1d2;p=thirdparty%2Fsamba.git tests/krb5: Add method to replace the device SIDs in a PAC Signed-off-by: Joseph Sutton Reviewed-by: Andrew Bartlett --- diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py index ce8ff39096d..73ed4c42bed 100644 --- a/python/samba/tests/krb5/kdc_base_test.py +++ b/python/samba/tests/krb5/kdc_base_test.py @@ -1686,6 +1686,145 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): return pac + # Replace the device SIDs in a PAC with 'new_sids'. + def set_pac_device_sids(self, + pac, + *, + new_sids, + domain_sid=None, + user_rid): + if domain_sid is None: + domain_sid = self.get_samdb().get_domain_sid() + + base_sids = [] + extra_sids = [] + resource_sids = [] + + primary_gid = None + + # Filter our SIDs into three arrays depending on their ultimate + # location in the PAC. + for entry in new_sids: + if isinstance(entry, frozenset): + resource_domain = None + domain_sids = [] + + for sid, sid_type, attrs in entry: + self.assertIs(sid_type, self.SidType.RESOURCE_SID, + 'only resource SIDs may be specified in this way') + + if isinstance(sid, int): + domain, rid = domain_sid, sid + else: + domain, rid = sid.rsplit('-', 1) + if resource_domain is None: + resource_domain = domain + else: + self.assertEqual(resource_domain, domain, + 'resource SIDs must share the same ' + 'domain') + + resource_sid = samr.RidWithAttribute() + resource_sid.rid = int(rid) + resource_sid.attributes = attrs + + domain_sids.append(resource_sid) + + membership = krb5pac.PAC_DOMAIN_GROUP_MEMBERSHIP() + if resource_domain is not None: + membership.domain_sid = security.dom_sid(resource_domain) + membership.groups.rids = domain_sids + membership.groups.count = len(domain_sids) + + resource_sids.append(membership) + else: + sid, sid_type, attrs = entry + if sid_type is self.SidType.BASE_SID: + if isinstance(sid, int): + domain, rid = domain_sid, sid + else: + domain, rid = sid.rsplit('-', 1) + self.assertEqual(domain_sid, domain, + f'base SID {sid} must be in our domain') + + base_sid = samr.RidWithAttribute() + base_sid.rid = int(rid) + base_sid.attributes = attrs + + base_sids.append(base_sid) + elif sid_type is self.SidType.EXTRA_SID: + extra_sid = netlogon.netr_SidAttr() + extra_sid.sid = security.dom_sid(sid) + extra_sid.attributes = attrs + + extra_sids.append(extra_sid) + elif sid_type is self.SidType.RESOURCE_SID: + self.fail('specify resource groups in frozenset(s)') + elif sid_type is self.SidType.PRIMARY_GID: + self.assertIsNone(primary_gid, + f'must not specify a second primary GID ' + f'{sid}') + self.assertIsNone(attrs, 'cannot specify primary GID attrs') + + if isinstance(sid, int): + domain, primary_gid = domain_sid, sid + else: + domain, primary_gid = sid.rsplit('-', 1) + self.assertEqual(domain_sid, domain, + f'primary GID {sid} must be in our domain') + else: + self.fail(f'invalid SID type {sid_type}') + + pac_buffers = pac.buffers + for pac_buffer in pac_buffers: + # Find the DEVICE_INFO PAC buffer. + if pac_buffer.type == krb5pac.PAC_TYPE_DEVICE_INFO: + logon_info = pac_buffer.info.info + break + else: + logon_info = krb5pac.PAC_DEVICE_INFO() + + logon_info_ctr = krb5pac.PAC_DEVICE_INFO_CTR() + logon_info_ctr.info = logon_info + + pac_buffer = krb5pac.PAC_BUFFER() + pac_buffer.type = krb5pac.PAC_TYPE_DEVICE_INFO + pac_buffer.info = logon_info_ctr + + pac_buffers.append(pac_buffer) + + logon_info.domain_sid = security.dom_sid(domain_sid) + logon_info.rid = int(user_rid) + + self.assertIsNotNone(primary_gid, 'please specify the primary GID') + logon_info.primary_gid = int(primary_gid) + + # Add Base SIDs. + if base_sids: + logon_info.groups.rids = base_sids + else: + logon_info.groups.rids = None + logon_info.groups.count = len(base_sids) + + # Add Extra SIDs. + if extra_sids: + logon_info.sids = extra_sids + else: + logon_info.sids = None + logon_info.sid_count = len(extra_sids) + + # Add Resource SIDs. + if resource_sids: + logon_info.domain_groups = resource_sids + else: + logon_info.domain_groups = None + logon_info.domain_group_count = len(resource_sids) + + pac.buffers = pac_buffers + pac.num_buffers = len(pac_buffers) + + return pac + def get_cached_creds(self, *, account_type, opts=None,