]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
tests/krb5: Move ticket_with_sids() to base class
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Mon, 20 Feb 2023 02:19:01 +0000 (15:19 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Fri, 3 Mar 2023 01:07:36 +0000 (01:07 +0000)
We need to use this in another test.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/krb5/group_tests.py
python/samba/tests/krb5/kdc_base_test.py

index c146e926033d2b609899c30384c07686a1643b46..9ece5e642713683887aadb40bef52c8ede51503f 100755 (executable)
@@ -26,12 +26,10 @@ os.environ['PYTHONUNBUFFERED'] = '1'
 import random
 import re
 
-from functools import partial
-
 import ldb
 
 from samba import werror
-from samba.dcerpc import krb5pac, netlogon, samr, security
+from samba.dcerpc import netlogon, security
 from samba.tests import DynamicTestCase, env_get_var_value
 from samba.tests.krb5 import kcrypto
 from samba.tests.krb5.kdc_base_test import GroupType, KDCBaseTest, Principal
@@ -423,163 +421,6 @@ class GroupTests(KDCBaseTest):
         self.assertEqual(domain_local_sid, str(sids[0].sid))
         self.assertEqual(self.resource_attrs, sids[0].attributes)
 
-    # Get a ticket with the SIDs in the PAC replaced with ones we specify. This
-    # is useful for creating arbitrary tickets that can be used to perform a
-    # TGS-REQ.
-    def ticket_with_sids(self,
-                         ticket,
-                         new_sids,
-                         domain_sid,
-                         user_rid,
-                         set_user_flags=0,
-                         reset_user_flags=0):
-        krbtgt_creds = self.get_krbtgt_creds()
-        krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
-
-        checksum_keys = {
-            krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
-        }
-
-        modify_pac_fn = partial(self.set_pac_sids,
-                                new_sids=new_sids,
-                                domain_sid=domain_sid,
-                                user_rid=user_rid,
-                                set_user_flags=set_user_flags,
-                                reset_user_flags=reset_user_flags)
-
-        return self.modified_ticket(ticket,
-                                    modify_pac_fn=modify_pac_fn,
-                                    checksum_keys=checksum_keys)
-
-    # Replace the SIDs in a PAC with 'new_sids'.
-    def set_pac_sids(self,
-                     pac,
-                     new_sids,
-                     domain_sid,
-                     user_rid,
-                     set_user_flags=0,
-                     reset_user_flags=0):
-        base_sids = []
-        extra_sids = []
-        resource_sids = []
-
-        resource_domain = None
-
-        primary_gid = None
-
-        # Filter our SIDs into three arrays depending on their ultimate
-        # location in the PAC.
-        for sid, sid_type, attrs in new_sids:
-            if sid_type is self.SidType.BASE_SID:
-                domain, rid = sid.rsplit('-', 1)
-                self.assertEqual(domain_sid, domain,
-                                 f'base SID {sid} must be in our domain')
-
-                base_sid = samr.RidWithAttribute()
-                base_sid.rid = int(rid)
-                base_sid.attributes = attrs
-
-                base_sids.append(base_sid)
-            elif sid_type is self.SidType.EXTRA_SID:
-                extra_sid = netlogon.netr_SidAttr()
-                extra_sid.sid = security.dom_sid(sid)
-                extra_sid.attributes = attrs
-
-                extra_sids.append(extra_sid)
-            elif sid_type is self.SidType.RESOURCE_SID:
-                domain, rid = sid.rsplit('-', 1)
-                if resource_domain is None:
-                    resource_domain = domain
-                else:
-                    self.assertEqual(resource_domain, domain,
-                                     'resource SIDs must share the same '
-                                     'domain')
-
-                resource_sid = samr.RidWithAttribute()
-                resource_sid.rid = int(rid)
-                resource_sid.attributes = attrs
-
-                resource_sids.append(resource_sid)
-            elif sid_type is self.SidType.PRIMARY_GID:
-                self.assertIsNone(primary_gid,
-                                  f'must not specify a second primary GID '
-                                  f'{sid}')
-                self.assertIsNone(attrs, 'cannot specify primary GID attrs')
-
-                domain, primary_gid = sid.rsplit('-', 1)
-                self.assertEqual(domain_sid, domain,
-                                 f'primary GID {sid} must be in our domain')
-            else:
-                self.fail(f'invalid SID type {sid_type}')
-
-        found_logon_info = True
-
-        user_sid = security.dom_sid(f'{domain_sid}-{user_rid}')
-
-        pac_buffers = pac.buffers
-        for pac_buffer in pac_buffers:
-            # Find the LOGON_INFO PAC buffer.
-            if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
-                logon_info = pac_buffer.info.info
-
-                # Add Extra SIDs and set the EXTRA_SIDS flag as needed.
-                logon_info.info3.sidcount = len(extra_sids)
-                if extra_sids:
-                    logon_info.info3.sids = extra_sids
-                    logon_info.info3.base.user_flags |= (
-                        netlogon.NETLOGON_EXTRA_SIDS)
-                else:
-                    logon_info.info3.sids = None
-                    logon_info.info3.base.user_flags &= ~(
-                        netlogon.NETLOGON_EXTRA_SIDS)
-
-                # Add Base SIDs.
-                logon_info.info3.base.groups.count = len(base_sids)
-                if base_sids:
-                    logon_info.info3.base.groups.rids = base_sids
-                else:
-                    logon_info.info3.base.groups.rids = None
-
-                logon_info.info3.base.domain_sid = security.dom_sid(domain_sid)
-                logon_info.info3.base.rid = int(user_rid)
-
-                if primary_gid is not None:
-                    logon_info.info3.base.primary_gid = int(primary_gid)
-
-                # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed.
-                logon_info.resource_groups.groups.count = len(resource_sids)
-                if resource_sids:
-                    resource_domain = security.dom_sid(resource_domain)
-                    logon_info.resource_groups.domain_sid = resource_domain
-                    logon_info.resource_groups.groups.rids = resource_sids
-                    logon_info.info3.base.user_flags |= (
-                        netlogon.NETLOGON_RESOURCE_GROUPS)
-                else:
-                    logon_info.resource_groups.domain_sid = None
-                    logon_info.resource_groups.groups.rids = None
-                    logon_info.info3.base.user_flags &= ~(
-                        netlogon.NETLOGON_RESOURCE_GROUPS)
-
-                logon_info.info3.base.user_flags |= set_user_flags
-                logon_info.info3.base.user_flags &= ~reset_user_flags
-
-                found_logon_info = True
-
-            # Also replace the user's SID in the UPN DNS buffer.
-            elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
-                upn_dns_info_ex = pac_buffer.info.ex
-
-                upn_dns_info_ex.objectsid = user_sid
-
-            # But don't replace the user's SID in the Requester SID buffer, or
-            # we'll get a SID mismatch.
-
-        self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer')
-
-        pac.buffers = pac_buffers
-
-        return pac
-
     # A list of test cases.
     cases = [
         # AS-REQ tests.
index 5d12c48a25010e4b6ede227ef06b1dc501705c6e..a01504403dbf3412edc699620512dd0f0b935e42 100644 (file)
@@ -23,6 +23,7 @@ sys.path.insert(0, "bin/python")
 os.environ["PYTHONUNBUFFERED"] = "1"
 
 from datetime import datetime, timezone
+from functools import partial
 import tempfile
 import binascii
 import collections
@@ -1105,6 +1106,163 @@ class KDCBaseTest(RawKerberosTest):
 
         return mapped_sids
 
+    # Get a ticket with the SIDs in the PAC replaced with ones we specify. This
+    # is useful for creating arbitrary tickets that can be used to perform a
+    # TGS-REQ.
+    def ticket_with_sids(self,
+                         ticket,
+                         new_sids,
+                         domain_sid,
+                         user_rid,
+                         set_user_flags=0,
+                         reset_user_flags=0):
+        krbtgt_creds = self.get_krbtgt_creds()
+        krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
+
+        checksum_keys = {
+            krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
+        }
+
+        modify_pac_fn = partial(self.set_pac_sids,
+                                new_sids=new_sids,
+                                domain_sid=domain_sid,
+                                user_rid=user_rid,
+                                set_user_flags=set_user_flags,
+                                reset_user_flags=reset_user_flags)
+
+        return self.modified_ticket(ticket,
+                                    modify_pac_fn=modify_pac_fn,
+                                    checksum_keys=checksum_keys)
+
+    # Replace the SIDs in a PAC with 'new_sids'.
+    def set_pac_sids(self,
+                     pac,
+                     new_sids,
+                     domain_sid,
+                     user_rid,
+                     set_user_flags=0,
+                     reset_user_flags=0):
+        base_sids = []
+        extra_sids = []
+        resource_sids = []
+
+        resource_domain = None
+
+        primary_gid = None
+
+        # Filter our SIDs into three arrays depending on their ultimate
+        # location in the PAC.
+        for sid, sid_type, attrs in new_sids:
+            if sid_type is self.SidType.BASE_SID:
+                domain, rid = sid.rsplit('-', 1)
+                self.assertEqual(domain_sid, domain,
+                                 f'base SID {sid} must be in our domain')
+
+                base_sid = samr.RidWithAttribute()
+                base_sid.rid = int(rid)
+                base_sid.attributes = attrs
+
+                base_sids.append(base_sid)
+            elif sid_type is self.SidType.EXTRA_SID:
+                extra_sid = netlogon.netr_SidAttr()
+                extra_sid.sid = security.dom_sid(sid)
+                extra_sid.attributes = attrs
+
+                extra_sids.append(extra_sid)
+            elif sid_type is self.SidType.RESOURCE_SID:
+                domain, rid = sid.rsplit('-', 1)
+                if resource_domain is None:
+                    resource_domain = domain
+                else:
+                    self.assertEqual(resource_domain, domain,
+                                     'resource SIDs must share the same '
+                                     'domain')
+
+                resource_sid = samr.RidWithAttribute()
+                resource_sid.rid = int(rid)
+                resource_sid.attributes = attrs
+
+                resource_sids.append(resource_sid)
+            elif sid_type is self.SidType.PRIMARY_GID:
+                self.assertIsNone(primary_gid,
+                                  f'must not specify a second primary GID '
+                                  f'{sid}')
+                self.assertIsNone(attrs, 'cannot specify primary GID attrs')
+
+                domain, primary_gid = sid.rsplit('-', 1)
+                self.assertEqual(domain_sid, domain,
+                                 f'primary GID {sid} must be in our domain')
+            else:
+                self.fail(f'invalid SID type {sid_type}')
+
+        found_logon_info = True
+
+        user_sid = security.dom_sid(f'{domain_sid}-{user_rid}')
+
+        pac_buffers = pac.buffers
+        for pac_buffer in pac_buffers:
+            # Find the LOGON_INFO PAC buffer.
+            if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
+                logon_info = pac_buffer.info.info
+
+                # Add Extra SIDs and set the EXTRA_SIDS flag as needed.
+                logon_info.info3.sidcount = len(extra_sids)
+                if extra_sids:
+                    logon_info.info3.sids = extra_sids
+                    logon_info.info3.base.user_flags |= (
+                        netlogon.NETLOGON_EXTRA_SIDS)
+                else:
+                    logon_info.info3.sids = None
+                    logon_info.info3.base.user_flags &= ~(
+                        netlogon.NETLOGON_EXTRA_SIDS)
+
+                # Add Base SIDs.
+                logon_info.info3.base.groups.count = len(base_sids)
+                if base_sids:
+                    logon_info.info3.base.groups.rids = base_sids
+                else:
+                    logon_info.info3.base.groups.rids = None
+
+                logon_info.info3.base.domain_sid = security.dom_sid(domain_sid)
+                logon_info.info3.base.rid = int(user_rid)
+
+                if primary_gid is not None:
+                    logon_info.info3.base.primary_gid = int(primary_gid)
+
+                # Add Resource SIDs and set the RESOURCE_GROUPS flag as needed.
+                logon_info.resource_groups.groups.count = len(resource_sids)
+                if resource_sids:
+                    resource_domain = security.dom_sid(resource_domain)
+                    logon_info.resource_groups.domain_sid = resource_domain
+                    logon_info.resource_groups.groups.rids = resource_sids
+                    logon_info.info3.base.user_flags |= (
+                        netlogon.NETLOGON_RESOURCE_GROUPS)
+                else:
+                    logon_info.resource_groups.domain_sid = None
+                    logon_info.resource_groups.groups.rids = None
+                    logon_info.info3.base.user_flags &= ~(
+                        netlogon.NETLOGON_RESOURCE_GROUPS)
+
+                logon_info.info3.base.user_flags |= set_user_flags
+                logon_info.info3.base.user_flags &= ~reset_user_flags
+
+                found_logon_info = True
+
+            # Also replace the user's SID in the UPN DNS buffer.
+            elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
+                upn_dns_info_ex = pac_buffer.info.ex
+
+                upn_dns_info_ex.objectsid = user_sid
+
+            # But don't replace the user's SID in the Requester SID buffer, or
+            # we'll get a SID mismatch.
+
+        self.assertTrue(found_logon_info, 'no LOGON_INFO PAC buffer')
+
+        pac.buffers = pac_buffers
+
+        return pac
+
     def get_cached_creds(self, *,
                          account_type,
                          opts=None,