From: Joseph Sutton Date: Mon, 20 Feb 2023 02:19:01 +0000 (+1300) Subject: tests/krb5: Move ticket_with_sids() to base class X-Git-Tag: talloc-2.4.1~1552 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=285f042e2ff9aa4a46f7e4acf9b8971fb5ebf18d;p=thirdparty%2Fsamba.git tests/krb5: Move ticket_with_sids() to base class We need to use this in another test. Signed-off-by: Joseph Sutton Reviewed-by: Andrew Bartlett --- diff --git a/python/samba/tests/krb5/group_tests.py b/python/samba/tests/krb5/group_tests.py index c146e926033..9ece5e64271 100755 --- a/python/samba/tests/krb5/group_tests.py +++ b/python/samba/tests/krb5/group_tests.py @@ -26,12 +26,10 @@ os.environ['PYTHONUNBUFFERED'] = '1' import random import re -from functools import partial - import ldb from samba import werror -from samba.dcerpc import krb5pac, netlogon, samr, security +from samba.dcerpc import netlogon, security from samba.tests import DynamicTestCase, env_get_var_value from samba.tests.krb5 import kcrypto from samba.tests.krb5.kdc_base_test import GroupType, KDCBaseTest, Principal @@ -423,163 +421,6 @@ class GroupTests(KDCBaseTest): self.assertEqual(domain_local_sid, str(sids[0].sid)) self.assertEqual(self.resource_attrs, sids[0].attributes) - # Get a ticket with the SIDs in the PAC replaced with ones we specify. This - # is useful for creating arbitrary tickets that can be used to perform a - # TGS-REQ. - def ticket_with_sids(self, - ticket, - new_sids, - domain_sid, - user_rid, - set_user_flags=0, - reset_user_flags=0): - krbtgt_creds = self.get_krbtgt_creds() - krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) - - checksum_keys = { - krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key - } - - modify_pac_fn = partial(self.set_pac_sids, - new_sids=new_sids, - domain_sid=domain_sid, - user_rid=user_rid, - set_user_flags=set_user_flags, - reset_user_flags=reset_user_flags) - - return self.modified_ticket(ticket, - modify_pac_fn=modify_pac_fn, - checksum_keys=checksum_keys) - - # Replace the SIDs in a PAC with 'new_sids'. - def set_pac_sids(self, - pac, - new_sids, - domain_sid, - user_rid, - set_user_flags=0, - reset_user_flags=0): - base_sids = [] - extra_sids = [] - resource_sids = [] - - resource_domain = None - - primary_gid = None - - # Filter our SIDs into three arrays depending on their ultimate - # location in the PAC. - for sid, sid_type, attrs in new_sids: - if sid_type is self.SidType.BASE_SID: - domain, rid = sid.rsplit('-', 1) - self.assertEqual(domain_sid, domain, - f'base SID {sid} must be in our domain') - - base_sid = samr.RidWithAttribute() - base_sid.rid = int(rid) - base_sid.attributes = attrs - - base_sids.append(base_sid) - elif sid_type is self.SidType.EXTRA_SID: - extra_sid = netlogon.netr_SidAttr() - extra_sid.sid = security.dom_sid(sid) - extra_sid.attributes = attrs - - extra_sids.append(extra_sid) - elif sid_type is self.SidType.RESOURCE_SID: - domain, rid = sid.rsplit('-', 1) - if resource_domain is None: - resource_domain = domain - else: - self.assertEqual(resource_domain, domain, - 'resource SIDs must share the same ' - 'domain') - - resource_sid = samr.RidWithAttribute() - resource_sid.rid = int(rid) - resource_sid.attributes = attrs - - resource_sids.append(resource_sid) - elif sid_type is self.SidType.PRIMARY_GID: - self.assertIsNone(primary_gid, - f'must not specify a second primary GID ' - f'{sid}') - self.assertIsNone(attrs, 'cannot specify primary GID attrs') - - domain, primary_gid = sid.rsplit('-', 1) - self.assertEqual(domain_sid, domain, - f'primary GID {sid} must be in our domain') - else: - self.fail(f'invalid SID type {sid_type}') - - found_logon_info = True - - user_sid = security.dom_sid(f'{domain_sid}-{user_rid}') - - pac_buffers = pac.buffers - for pac_buffer in pac_buffers: - # Find the LOGON_INFO PAC buffer. - if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO: - logon_info = pac_buffer.info.info - - # Add Extra SIDs and set the EXTRA_SIDS flag as needed. - logon_info.info3.sidcount = len(extra_sids) - if extra_sids: - logon_info.info3.sids = extra_sids - logon_info.info3.base.user_flags |= ( - netlogon.NETLOGON_EXTRA_SIDS) - else: - logon_info.info3.sids = None - logon_info.info3.base.user_flags &= ~( - netlogon.NETLOGON_EXTRA_SIDS) - - # Add Base SIDs. - logon_info.info3.base.groups.count = len(base_sids) - if base_sids: - logon_info.info3.base.groups.rids = base_sids - else: - logon_info.info3.base.groups.rids = None - - logon_info.info3.base.domain_sid = security.dom_sid(domain_sid) - logon_info.info3.base.rid = int(user_rid) - - if primary_gid is not None: - logon_info.info3.base.primary_gid = int(primary_gid) - - # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed. - logon_info.resource_groups.groups.count = len(resource_sids) - if resource_sids: - resource_domain = security.dom_sid(resource_domain) - logon_info.resource_groups.domain_sid = resource_domain - logon_info.resource_groups.groups.rids = resource_sids - logon_info.info3.base.user_flags |= ( - netlogon.NETLOGON_RESOURCE_GROUPS) - else: - logon_info.resource_groups.domain_sid = None - logon_info.resource_groups.groups.rids = None - logon_info.info3.base.user_flags &= ~( - netlogon.NETLOGON_RESOURCE_GROUPS) - - logon_info.info3.base.user_flags |= set_user_flags - logon_info.info3.base.user_flags &= ~reset_user_flags - - found_logon_info = True - - # Also replace the user's SID in the UPN DNS buffer. - elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO: - upn_dns_info_ex = pac_buffer.info.ex - - upn_dns_info_ex.objectsid = user_sid - - # But don't replace the user's SID in the Requester SID buffer, or - # we'll get a SID mismatch. - - self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer') - - pac.buffers = pac_buffers - - return pac - # A list of test cases. cases = [ # AS-REQ tests. diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py index 5d12c48a250..a01504403db 100644 --- a/python/samba/tests/krb5/kdc_base_test.py +++ b/python/samba/tests/krb5/kdc_base_test.py @@ -23,6 +23,7 @@ sys.path.insert(0, "bin/python") os.environ["PYTHONUNBUFFERED"] = "1" from datetime import datetime, timezone +from functools import partial import tempfile import binascii import collections @@ -1105,6 +1106,163 @@ class KDCBaseTest(RawKerberosTest): return mapped_sids + # Get a ticket with the SIDs in the PAC replaced with ones we specify. This + # is useful for creating arbitrary tickets that can be used to perform a + # TGS-REQ. + def ticket_with_sids(self, + ticket, + new_sids, + domain_sid, + user_rid, + set_user_flags=0, + reset_user_flags=0): + krbtgt_creds = self.get_krbtgt_creds() + krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) + + checksum_keys = { + krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key + } + + modify_pac_fn = partial(self.set_pac_sids, + new_sids=new_sids, + domain_sid=domain_sid, + user_rid=user_rid, + set_user_flags=set_user_flags, + reset_user_flags=reset_user_flags) + + return self.modified_ticket(ticket, + modify_pac_fn=modify_pac_fn, + checksum_keys=checksum_keys) + + # Replace the SIDs in a PAC with 'new_sids'. + def set_pac_sids(self, + pac, + new_sids, + domain_sid, + user_rid, + set_user_flags=0, + reset_user_flags=0): + base_sids = [] + extra_sids = [] + resource_sids = [] + + resource_domain = None + + primary_gid = None + + # Filter our SIDs into three arrays depending on their ultimate + # location in the PAC. + for sid, sid_type, attrs in new_sids: + if sid_type is self.SidType.BASE_SID: + domain, rid = sid.rsplit('-', 1) + self.assertEqual(domain_sid, domain, + f'base SID {sid} must be in our domain') + + base_sid = samr.RidWithAttribute() + base_sid.rid = int(rid) + base_sid.attributes = attrs + + base_sids.append(base_sid) + elif sid_type is self.SidType.EXTRA_SID: + extra_sid = netlogon.netr_SidAttr() + extra_sid.sid = security.dom_sid(sid) + extra_sid.attributes = attrs + + extra_sids.append(extra_sid) + elif sid_type is self.SidType.RESOURCE_SID: + domain, rid = sid.rsplit('-', 1) + if resource_domain is None: + resource_domain = domain + else: + self.assertEqual(resource_domain, domain, + 'resource SIDs must share the same ' + 'domain') + + resource_sid = samr.RidWithAttribute() + resource_sid.rid = int(rid) + resource_sid.attributes = attrs + + resource_sids.append(resource_sid) + elif sid_type is self.SidType.PRIMARY_GID: + self.assertIsNone(primary_gid, + f'must not specify a second primary GID ' + f'{sid}') + self.assertIsNone(attrs, 'cannot specify primary GID attrs') + + domain, primary_gid = sid.rsplit('-', 1) + self.assertEqual(domain_sid, domain, + f'primary GID {sid} must be in our domain') + else: + self.fail(f'invalid SID type {sid_type}') + + found_logon_info = True + + user_sid = security.dom_sid(f'{domain_sid}-{user_rid}') + + pac_buffers = pac.buffers + for pac_buffer in pac_buffers: + # Find the LOGON_INFO PAC buffer. + if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO: + logon_info = pac_buffer.info.info + + # Add Extra SIDs and set the EXTRA_SIDS flag as needed. + logon_info.info3.sidcount = len(extra_sids) + if extra_sids: + logon_info.info3.sids = extra_sids + logon_info.info3.base.user_flags |= ( + netlogon.NETLOGON_EXTRA_SIDS) + else: + logon_info.info3.sids = None + logon_info.info3.base.user_flags &= ~( + netlogon.NETLOGON_EXTRA_SIDS) + + # Add Base SIDs. + logon_info.info3.base.groups.count = len(base_sids) + if base_sids: + logon_info.info3.base.groups.rids = base_sids + else: + logon_info.info3.base.groups.rids = None + + logon_info.info3.base.domain_sid = security.dom_sid(domain_sid) + logon_info.info3.base.rid = int(user_rid) + + if primary_gid is not None: + logon_info.info3.base.primary_gid = int(primary_gid) + + # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed. + logon_info.resource_groups.groups.count = len(resource_sids) + if resource_sids: + resource_domain = security.dom_sid(resource_domain) + logon_info.resource_groups.domain_sid = resource_domain + logon_info.resource_groups.groups.rids = resource_sids + logon_info.info3.base.user_flags |= ( + netlogon.NETLOGON_RESOURCE_GROUPS) + else: + logon_info.resource_groups.domain_sid = None + logon_info.resource_groups.groups.rids = None + logon_info.info3.base.user_flags &= ~( + netlogon.NETLOGON_RESOURCE_GROUPS) + + logon_info.info3.base.user_flags |= set_user_flags + logon_info.info3.base.user_flags &= ~reset_user_flags + + found_logon_info = True + + # Also replace the user's SID in the UPN DNS buffer. + elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO: + upn_dns_info_ex = pac_buffer.info.ex + + upn_dns_info_ex.objectsid = user_sid + + # But don't replace the user's SID in the Requester SID buffer, or + # we'll get a SID mismatch. + + self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer') + + pac.buffers = pac_buffers + + return pac + def get_cached_creds(self, *, account_type, opts=None,