]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
tests/krb5: Overhaul PAC logon info group checking
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Thu, 3 Nov 2022 01:54:23 +0000 (14:54 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Tue, 8 Nov 2022 02:39:37 +0000 (02:39 +0000)
We can now verify attributes of SIDs and the PAC locations in which SIDs
are placed. We also gain the ability to assert that no SIDs are present
in the PAC other than the ones we expect.

We lighten somewhat the requirement that no duplicates are present among
the SIDs, as such a situation may arise even with Windows, especially if
group types are changed. For example, if a Universal group containing a
user is changed to a Domain-Local group in between an AS-REQ and a
TGS-REQ, the group's SID will be added to the PAC once for each request.
We only verify that there are no exact duplicates (SID, attributes, and
PAC location all being identical).

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/krb5/raw_testcase.py
python/samba/tests/krb5/s4u_tests.py

index 172ad3bf14669c3eeee1f3fbee32ee1d4b310f2b..1fe29581204d50d551158d450a768b08672e2a62 100644 (file)
@@ -38,7 +38,7 @@ from pyasn1.codec.ber.encoder import BitStringEncoder
 from pyasn1.error import PyAsn1Error
 
 from samba.credentials import Credentials
-from samba.dcerpc import claims, krb5pac, security
+from samba.dcerpc import claims, krb5pac, netlogon, security
 from samba.gensec import FEATURE_SEAL
 from samba.ndr import ndr_pack, ndr_unpack
 
@@ -549,6 +549,12 @@ class RawKerberosTest(TestCaseInTempDir):
         SET = object()
         CHANGE = object()
 
+    # The location of a SID within the PAC
+    class SidType(Enum):
+        BASE_SID = object()  # in info3.base.groups
+        EXTRA_SID = object()  # in info3.sids
+        RESOURCE_SID = object()  # in resource_groups
+
     pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
                           krb5pac.PAC_TYPE_KDC_CHECKSUM,
                           krb5pac.PAC_TYPE_TICKET_CHECKSUM}
@@ -3124,6 +3130,106 @@ class RawKerberosTest(TestCaseInTempDir):
 
         kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
 
+    # Check the SIDs in a LOGON_INFO PAC buffer.
+    def check_logon_info_sids(self, logon_info_buffer, kdc_exchange_dict):
+        info3 = logon_info_buffer.info.info.info3
+        logon_info = info3.base
+        resource_groups = logon_info_buffer.info.info.resource_groups
+
+        expected_groups = kdc_exchange_dict['expected_groups']
+        unexpected_groups = kdc_exchange_dict['unexpected_groups']
+        expected_sid = kdc_exchange_dict['expected_sid']
+
+        domain_sid = logon_info.domain_sid
+
+        if expected_sid is not None:
+            got_sid = f'{domain_sid}-{logon_info.rid}'
+            self.assertEqual(expected_sid, got_sid)
+
+        if expected_groups is None and unexpected_groups is None:
+            # Nothing more to do.
+            return
+
+        # Check the SIDs in the PAC.
+
+        # A representation of the PAC.
+        pac_sids = set()
+
+        # Collect the Extra SIDs.
+        if info3.sids is not None:
+            self.assertTrue(logon_info.user_flags & (
+                netlogon.NETLOGON_EXTRA_SIDS),
+                            'extra SIDs present, but EXTRA_SIDS flag not set')
+            self.assertTrue(info3.sids, 'got empty SIDs')
+
+            for sid_attr in info3.sids:
+                got_sid = str(sid_attr.sid)
+                if unexpected_groups is not None:
+                    self.assertNotIn(got_sid, unexpected_groups)
+
+                pac_sid = (got_sid,
+                           self.SidType.EXTRA_SID,
+                           sid_attr.attributes)
+                self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
+                pac_sids.add(pac_sid)
+        else:
+            self.assertFalse(logon_info.user_flags & (
+                netlogon.NETLOGON_EXTRA_SIDS),
+                             'no extra SIDs present, but EXTRA_SIDS flag set')
+
+        # Collect the Base RIDs.
+        if logon_info.groups.rids is not None:
+            self.assertTrue(logon_info.groups.rids, 'got empty RIDs')
+
+            for group in logon_info.groups.rids:
+                got_sid = f'{domain_sid}-{group.rid}'
+                if unexpected_groups is not None:
+                    self.assertNotIn(got_sid, unexpected_groups)
+
+                pac_sid = (got_sid, self.SidType.BASE_SID, group.attributes)
+                self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
+                pac_sids.add(pac_sid)
+
+        # Collect the Resource SIDs.
+        if resource_groups.groups.rids is not None:
+            self.assertTrue(logon_info.user_flags & (
+                netlogon.NETLOGON_RESOURCE_GROUPS),
+                            'resource groups present, but RESOURCE_GROUPS '
+                            'flag not set')
+            self.assertTrue(resource_groups.groups.rids, 'got empty RIDs')
+
+            resource_group_sid = resource_groups.domain_sid
+            for resource_group in resource_groups.groups.rids:
+                got_sid = f'{resource_group_sid}-{resource_group.rid}'
+                if unexpected_groups is not None:
+                    self.assertNotIn(got_sid, unexpected_groups)
+
+                pac_sid = (got_sid,
+                           self.SidType.RESOURCE_SID,
+                           resource_group.attributes)
+                self.assertNotIn(pac_sid, pac_sids, 'got duplicated SID')
+                pac_sids.add(pac_sid)
+        else:
+            self.assertFalse(logon_info.user_flags & (
+                netlogon.NETLOGON_RESOURCE_GROUPS),
+                             'no resource groups present, but RESOURCE_GROUPS '
+                             'flag set')
+
+        # Compare the aggregated SIDs against the set of expected SIDs.
+        if expected_groups is not None:
+            if ... in expected_groups:
+                # The caller is only interested in asserting the
+                # presence of particular groups, and doesn't mind if
+                # other groups are present as well.
+                pac_sids.add(...)
+                self.assertLessEqual(expected_groups, pac_sids,
+                                     'expected groups')
+            else:
+                # The caller wants to make sure the groups match
+                # exactly.
+                self.assertEqual(expected_groups, pac_sids,
+                                 'expected != got')
+
     def check_pac_buffers(self, pac_data, kdc_exchange_dict):
         pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
 
@@ -3249,8 +3355,6 @@ class RawKerberosTest(TestCaseInTempDir):
             unchecked=unchecked)
 
         expected_account_name = kdc_exchange_dict['expected_account_name']
-        expected_groups = kdc_exchange_dict['expected_groups']
-        unexpected_groups = kdc_exchange_dict['unexpected_groups']
         expected_sid = kdc_exchange_dict['expected_sid']
 
         expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex']
@@ -3290,33 +3394,7 @@ class RawKerberosTest(TestCaseInTempDir):
                     self.assertEqual(expected_account_name,
                                      str(logon_info.account_name))
 
-                if expected_sid is not None:
-                    expected_rid = int(expected_sid.rsplit('-', 1)[1])
-                    self.assertEqual(expected_rid, logon_info.rid)
-
-                if expected_groups is not None:
-                    self.assertIsNotNone(info3.sids)
-                    got_sids = {str(sid_attr.sid) for sid_attr in info3.sids}
-                    self.assertEqual(info3.sidcount,
-                                     len(got_sids),
-                                     'Found duplicate SIDs')
-
-                    match_count = 0
-                    for g in expected_groups:
-                        for sid_attr in info3.sids:
-                            if g == str(sid_attr.sid):
-                                match_count += 1
-                    self.assertEqual(match_count, len(expected_groups))
-
-                if unexpected_groups is not None:
-                    match_count = 0
-
-                    for g in unexpected_groups:
-                        self.assertIsNotNone(info3.sids)
-                        for sid_attr in info3.sids:
-                            if g == str(sid_attr.sid):
-                                match_count += 1
-                    self.assertEqual(match_count, 0)
+                self.check_logon_info_sids(pac_buffer, kdc_exchange_dict)
 
             elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
                 upn_dns_info = pac_buffer.info
index 482e95ae02e3c7c242a9f181ca1f76da0a9596ac..49727685f72b03623326906c678cbd9e787523e8 100755 (executable)
@@ -30,6 +30,7 @@ from samba.tests import env_get_var_value
 from samba.tests.krb5.kcrypto import Cksumtype, Enctype
 from samba.tests.krb5.kdc_base_test import KDCBaseTest
 from samba.tests.krb5.raw_testcase import (
+    RawKerberosTest,
     RodcPacEncryptionKey,
     ZeroedChecksumKey
 )
@@ -51,12 +52,18 @@ from samba.tests.krb5.rfc4120_constants import (
 )
 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
 
+SidType = RawKerberosTest.SidType
+
 global_asn1_print = False
 global_hexdump = False
 
 
 class S4UKerberosTests(KDCBaseTest):
 
+    default_attrs = (security.SE_GROUP_MANDATORY |
+                     security.SE_GROUP_ENABLED_BY_DEFAULT |
+                     security.SE_GROUP_ENABLED)
+
     def setUp(self):
         super(S4UKerberosTests, self).setUp()
         self.do_asn1_print = global_asn1_print
@@ -541,8 +548,15 @@ class S4UKerberosTests(KDCBaseTest):
                 'client_opts': {
                     'not_delegated': False
                 },
-                'expected_groups': [security.SID_SERVICE_ASSERTED_IDENTITY],
-                'unexpected_groups': [security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY]
+                'expected_groups': {
+                    (security.SID_SERVICE_ASSERTED_IDENTITY,
+                     SidType.EXTRA_SID,
+                     self.default_attrs),
+                    ...
+                },
+                'unexpected_groups': {
+                    security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
+                },
             })
 
     def _run_delegation_test(self, kdc_dict):
@@ -783,8 +797,15 @@ class S4UKerberosTests(KDCBaseTest):
             {
                 'expected_error_mode': 0,
                 'allow_delegation': True,
-                'expected_groups': [security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY],
-                'unexpected_groups': [security.SID_SERVICE_ASSERTED_IDENTITY]
+                'expected_groups': {
+                    (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
+                     SidType.EXTRA_SID,
+                     self.default_attrs),
+                    ...
+                },
+                'unexpected_groups': {
+                    security.SID_SERVICE_ASSERTED_IDENTITY,
+                },
             })
 
     def test_constrained_delegation_service_asserted_identity(self):
@@ -798,8 +819,15 @@ class S4UKerberosTests(KDCBaseTest):
                 'service1_opts': {
                     'trusted_to_auth_for_delegation': True,
                 },
-                'expected_groups': [security.SID_SERVICE_ASSERTED_IDENTITY],
-                'unexpected_groups': [security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY]
+                'expected_groups': {
+                    (security.SID_SERVICE_ASSERTED_IDENTITY,
+                     SidType.EXTRA_SID,
+                     self.default_attrs),
+                    ...
+                },
+                'unexpected_groups': {
+                    security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY,
+                },
             })
 
     def test_constrained_delegation_no_auth_data_required(self):