]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
selftest: krb5 account creation: clarify account type as an enum
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Fri, 8 Oct 2021 02:40:09 +0000 (15:40 +1300)
committerStefan Metzmacher <metze@samba.org>
Wed, 27 Oct 2021 22:37:10 +0000 (22:37 +0000)
This makes the code clearer with a symbolic constant rather
than a True/False boolean.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14869
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14881

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
(cherry picked from commit 49306f74eb29a2192019fab9260f9d242f9d5fd9)

python/samba/tests/krb5/as_canonicalization_tests.py
python/samba/tests/krb5/kdc_base_test.py
python/samba/tests/krb5/kdc_tgs_tests.py
python/samba/tests/krb5/ms_kile_client_principal_lookup_tests.py
python/samba/tests/krb5/rodc_tests.py
python/samba/tests/krb5/s4u_tests.py
python/samba/tests/krb5/test_ccache.py

index 9538d0ae3cfcc51c32abe00b15379085bbb540f9..674fcb3710165a9e71b629cb60592a0ef9f7ed2b 100755 (executable)
@@ -171,9 +171,10 @@ class KerberosASCanonicalizationTests(KDCBaseTest):
     def machine_account_creds(self):
         if self.machine_creds is None:
             samdb = self.get_samdb()
-            self.machine_creds, _ = self.create_account(samdb,
-                                                        MACHINE_NAME,
-                                                        machine_account=True)
+            self.machine_creds, _ = self.create_account(
+                samdb,
+                MACHINE_NAME,
+                account_type=self.AccountType.COMPUTER)
             self.machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
             self.machine_creds.set_kerberos_state(DONT_USE_KERBEROS)
 
index 1fc15315b0b08655aba49721391772ba4457a67e..7cd3c5255f22b3814873036a537fa7ac8c511277 100644 (file)
@@ -23,6 +23,7 @@ import tempfile
 import binascii
 import collections
 import secrets
+from enum import Enum, auto
 
 from collections import namedtuple
 import ldb
@@ -90,6 +91,10 @@ class KDCBaseTest(RawKerberosTest):
     """ Base class for KDC tests.
     """
 
+    class AccountType(Enum):
+        USER = auto()
+        COMPUTER = auto()
+
     @classmethod
     def setUpClass(cls):
         super().setUpClass()
@@ -230,7 +235,7 @@ class KDCBaseTest(RawKerberosTest):
 
         return default_enctypes
 
-    def create_account(self, samdb, name, machine_account=False,
+    def create_account(self, samdb, name, account_type=AccountType.USER,
                        spn=None, upn=None, additional_details=None,
                        ou=None, account_control=0):
         '''Create an account for testing.
@@ -238,8 +243,10 @@ class KDCBaseTest(RawKerberosTest):
            which is used by tearDownClass to clean up the created accounts.
         '''
         if ou is None:
-            guid = (DS_GUID_COMPUTERS_CONTAINER if machine_account
-                    else DS_GUID_USERS_CONTAINER)
+            if account_type is account_type.COMPUTER:
+                guid = DS_GUID_COMPUTERS_CONTAINER
+            else:
+                guid = DS_GUID_USERS_CONTAINER
 
             ou = samdb.get_wellknown_dn(samdb.get_default_basedn(), guid)
 
@@ -248,14 +255,17 @@ class KDCBaseTest(RawKerberosTest):
         # remove the account if it exists, this will happen if a previous test
         # run failed
         delete_force(samdb, dn)
-        if machine_account:
-            object_class = "computer"
-            account_name = "%s$" % name
-            account_control |= UF_WORKSTATION_TRUST_ACCOUNT
-        else:
+        if account_type is self.AccountType.USER:
             object_class = "user"
             account_name = name
             account_control |= UF_NORMAL_ACCOUNT
+        else:
+            object_class = "computer"
+            account_name = "%s$" % name
+            if account_type is self.AccountType.COMPUTER:
+                account_control |= UF_WORKSTATION_TRUST_ACCOUNT
+            else:
+                self.fail()
 
         password = generate_random_password(32, 32)
         utf16pw = ('"%s"' % password).encode('utf-16-le')
@@ -267,6 +277,10 @@ class KDCBaseTest(RawKerberosTest):
             "userAccountControl": str(account_control),
             "unicodePwd": utf16pw}
         if spn is not None:
+            if isinstance(spn, str):
+                spn = spn.format(account=account_name)
+            else:
+                spn = tuple(s.format(account=account_name) for s in spn)
             details["servicePrincipalName"] = spn
         if upn is not None:
             details["userPrincipalName"] = upn
@@ -280,10 +294,10 @@ class KDCBaseTest(RawKerberosTest):
         creds.set_domain(samdb.domain_netbios_name().upper())
         creds.set_password(password)
         creds.set_username(account_name)
-        if machine_account:
-            creds.set_workstation(name)
-        else:
+        if account_type is self.AccountType.USER:
             creds.set_workstation('')
+        else:
+            creds.set_workstation(name)
         creds.set_dn(ldb.Dn(samdb, dn))
         creds.set_spn(spn)
         #
@@ -609,13 +623,14 @@ class KDCBaseTest(RawKerberosTest):
         return cleanup
 
     def get_cached_creds(self, *,
-                         machine_account,
+                         account_type,
                          opts=None,
                          use_cache=True):
         if opts is None:
             opts = {}
 
         opts_default = {
+            'spn': None,
             'allowed_replication': False,
             'allowed_replication_mock': False,
             'denied_replication': False,
@@ -632,7 +647,7 @@ class KDCBaseTest(RawKerberosTest):
         }
 
         account_opts = {
-            'machine_account': machine_account,
+            'account_type': account_type,
             **opts_default,
             **opts
         }
@@ -651,7 +666,8 @@ class KDCBaseTest(RawKerberosTest):
         return creds
 
     def create_account_opts(self, *,
-                            machine_account,
+                            account_type,
+                            spn,
                             allowed_replication,
                             allowed_replication_mock,
                             denied_replication,
@@ -665,12 +681,13 @@ class KDCBaseTest(RawKerberosTest):
                             delegation_from_dn,
                             trusted_to_auth_for_delegation,
                             fast_support):
-        if machine_account:
-            self.assertFalse(not_delegated)
-        else:
+        if account_type is self.AccountType.USER:
+            self.assertIsNone(spn)
             self.assertIsNone(delegation_to_spn)
             self.assertIsNone(delegation_from_dn)
             self.assertFalse(trusted_to_auth_for_delegation)
+        else:
+            self.assertFalse(not_delegated)
 
         samdb = self.get_samdb()
         rodc_samdb = self.get_rodc_samdb()
@@ -707,13 +724,11 @@ class KDCBaseTest(RawKerberosTest):
             details['msDS-AllowedToActOnBehalfOfOtherIdentity'] = (
                 security_descriptor)
 
-        if machine_account:
+        if spn is None and account_type is not self.AccountType.USER:
             spn = 'host/' + user_name
-        else:
-            spn = None
 
         creds, dn = self.create_account(samdb, user_name,
-                                        machine_account=machine_account,
+                                        account_type=account_type,
                                         spn=spn,
                                         additional_details=details,
                                         account_control=user_account_control)
@@ -787,7 +802,7 @@ class KDCBaseTest(RawKerberosTest):
                          allow_missing_password=False,
                          allow_missing_keys=True):
         def create_client_account():
-            return self.get_cached_creds(machine_account=False)
+            return self.get_cached_creds(account_type=self.AccountType.USER)
 
         c = self._get_krb5_creds(prefix='CLIENT',
                                  allow_missing_password=allow_missing_password,
@@ -799,7 +814,7 @@ class KDCBaseTest(RawKerberosTest):
                        allow_missing_password=False,
                        allow_missing_keys=True):
         def create_mach_account():
-            return self.get_cached_creds(machine_account=True,
+            return self.get_cached_creds(account_type=self.AccountType.COMPUTER,
                                          opts={'fast_support': True})
 
         c = self._get_krb5_creds(prefix='MAC',
@@ -813,7 +828,7 @@ class KDCBaseTest(RawKerberosTest):
                           allow_missing_keys=True):
         def create_service_account():
             return self.get_cached_creds(
-                machine_account=True,
+                account_type=self.AccountType.COMPUTER,
                 opts={
                     'trusted_to_auth_for_delegation': True,
                     'fast_support': True
index 9d846a2c3ad516802aa5a0f5062fadc55b6be023..f36704f998cf071efc6a0f464a224a0dad4cf7b4 100755 (executable)
@@ -148,7 +148,8 @@ class KdcTgsTests(KDCBaseTest):
         samdb = self.get_samdb()
         user_name = "tsttktusr"
         (uc, dn) = self.create_account(samdb, user_name)
-        (mc, _) = self.create_account(samdb, "tsttktmac", machine_account=True)
+        (mc, _) = self.create_account(samdb, "tsttktmac",
+                                      account_type=self.AccountType.COMPUTER)
         realm = uc.get_realm().lower()
 
         # Do the initial AS-REQ, should get a pre-authentication required
@@ -282,7 +283,7 @@ class KdcTgsTests(KDCBaseTest):
 
     def test_client_no_auth_data_required(self):
         client_creds = self.get_cached_creds(
-            machine_account=False,
+            account_type=self.AccountType.USER,
             opts={'no_auth_data_required': True})
         service_creds = self.get_service_creds()
 
@@ -299,7 +300,7 @@ class KdcTgsTests(KDCBaseTest):
     def test_service_no_auth_data_required(self):
         client_creds = self.get_client_creds()
         service_creds = self.get_cached_creds(
-            machine_account=True,
+            account_type=self.AccountType.COMPUTER,
             opts={'no_auth_data_required': True})
 
         tgt = self.get_tgt(client_creds)
index 2ee3d4a2a83f7a4b37d92b2efe6494a941f34267..0aa3309b81494c469d56887f0f17fe9d7291ffa6 100755 (executable)
@@ -95,7 +95,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         realm = uc.get_realm().lower()
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, should get a pre-authentication required
         # response
@@ -151,7 +152,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         #
         samdb = self.get_samdb()
         mach_name = "mskilemac"
-        (mc, dn) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, dn) = self.create_account(samdb, mach_name,
+                                       account_type=self.AccountType.COMPUTER)
         realm = mc.get_realm().lower()
 
         # Do the initial AS-REQ, should get a pre-authentication required
@@ -215,7 +217,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         realm = uc.get_realm().lower()
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, should get a pre-authentication required
         # response
@@ -286,7 +289,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, as we've set UF_DONT_REQUIRE_PREAUTH
         # we should get a valid AS-RESP
@@ -351,7 +355,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, should get a pre-authentication required
         # response
@@ -420,7 +425,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         self.add_attribute(samdb, dn, "altSecurityIdentities", alt_sec)
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, should get a pre-authentication required
         # response
@@ -459,7 +465,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         realm = uc.get_realm().lower()
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, should get a pre-authentication required
         # response
@@ -523,7 +530,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         ename = user_name + "@" + realm
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, should get a pre-authentication required
         # response
@@ -586,7 +594,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         realm = uc.get_realm().lower()
 
         mach_name = "mskilemac"
-        (mc, dn) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, dn) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
         ename = mach_name + "@" + realm
         uname = mach_name + "$@" + realm
 
@@ -661,7 +670,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         ename = alt_name + "@" + realm
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, as we've set UF_DONT_REQUIRE_PREAUTH
         # we should get a valid AS-RESP
@@ -728,7 +738,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         uname = user_name + "@" + realm
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, should get a pre-authentication required
         # response
@@ -798,7 +809,8 @@ class MS_Kile_Client_Principal_Lookup_Tests(KDCBaseTest):
         ename = alt_name + "@" + realm
 
         mach_name = "mskilemac"
-        (mc, _) = self.create_account(samdb, mach_name, machine_account=True)
+        (mc, _) = self.create_account(samdb, mach_name,
+                                      account_type=self.AccountType.COMPUTER)
 
         # Do the initial AS-REQ, should get a pre-authentication required
         # response
index 4579f9eb55267e239cf0165041aef37025824748..302ae865cf18db63538c9735fc1348fdcf5e894f 100755 (executable)
@@ -39,12 +39,12 @@ class RodcKerberosTests(KDCBaseTest):
     # and including the RODCIdentifier.
     def test_rodc_ticket_signature(self):
         user_creds = self.get_cached_creds(
-            machine_account=False,
+            account_type=self.AccountType.USER,
             opts={
                 'revealed_to_rodc': True
             })
         target_creds = self.get_cached_creds(
-            machine_account=True,
+            account_type=self.AccountType.COMPUTER,
             opts={
                 'revealed_to_rodc': True
             })
index bbb7135b55b38fa6467dac78a52055e8cfe9d00d..ea629d2970674c28e85fb88807a4caacb1672c22 100755 (executable)
@@ -220,12 +220,14 @@ class S4UKerberosTests(KDCBaseTest):
 
     def _run_s4u2self_test(self, kdc_dict):
         client_opts = kdc_dict.pop('client_opts', None)
-        client_creds = self.get_cached_creds(machine_account=False,
-                                             opts=client_opts)
+        client_creds = self.get_cached_creds(
+            account_type=self.AccountType.USER,
+            opts=client_opts)
 
         service_opts = kdc_dict.pop('service_opts', None)
-        service_creds = self.get_cached_creds(machine_account=True,
-                                              opts=service_opts)
+        service_creds = self.get_cached_creds(
+            account_type=self.AccountType.COMPUTER,
+            opts=service_opts)
 
         service_tgt = self.get_tgt(service_creds)
         modify_service_tgt_fn = kdc_dict.pop('modify_service_tgt_fn', None)
@@ -432,8 +434,9 @@ class S4UKerberosTests(KDCBaseTest):
 
     def _run_delegation_test(self, kdc_dict):
         client_opts = kdc_dict.pop('client_opts', None)
-        client_creds = self.get_cached_creds(machine_account=False,
-                                             opts=client_opts)
+        client_creds = self.get_cached_creds(
+            account_type=self.AccountType.USER,
+            opts=client_opts)
 
         service1_opts = kdc_dict.pop('service1_opts', {})
         service2_opts = kdc_dict.pop('service2_opts', {})
@@ -443,24 +446,28 @@ class S4UKerberosTests(KDCBaseTest):
         self.assertFalse(allow_delegation and allow_rbcd)
 
         if allow_rbcd:
-            service1_creds = self.get_cached_creds(machine_account=True,
-                                                   opts=service1_opts)
+            service1_creds = self.get_cached_creds(
+                account_type=self.AccountType.COMPUTER,
+                opts=service1_opts)
 
             self.assertNotIn('delegation_from_dn', service2_opts)
             service2_opts['delegation_from_dn'] = str(service1_creds.get_dn())
 
-            service2_creds = self.get_cached_creds(machine_account=True,
-                                                   opts=service2_opts)
+            service2_creds = self.get_cached_creds(
+                account_type=self.AccountType.COMPUTER,
+                opts=service2_opts)
         else:
-            service2_creds = self.get_cached_creds(machine_account=True,
-                                                   opts=service2_opts)
+            service2_creds = self.get_cached_creds(
+                account_type=self.AccountType.COMPUTER,
+                opts=service2_opts)
 
             if allow_delegation:
                 self.assertNotIn('delegation_to_spn', service1_opts)
                 service1_opts['delegation_to_spn'] = service2_creds.get_spn()
 
-            service1_creds = self.get_cached_creds(machine_account=True,
-                                                   opts=service1_opts)
+            service1_creds = self.get_cached_creds(
+                account_type=self.AccountType.COMPUTER,
+                opts=service1_opts)
 
         client_tkt_options = kdc_dict.pop('client_tkt_options', 'forwardable')
         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
index c44ea02d504b18e3d8a14f16ec93ab40e7e8d71c..6a2b78398ac3e5572aab20c7217b42a1d9a70bba 100755 (executable)
@@ -55,11 +55,12 @@ class CcacheTests(KDCBaseTest):
         (user_credentials, _) = self.create_account(samdb, user_name)
 
         # Create the machine account.
-        (mach_credentials, _) = self.create_account(samdb,
-                                                    mach_name,
-                                                    machine_account=True,
-                                                    spn="%s/%s" % (service,
-                                                                   mach_name))
+        (mach_credentials, _) = self.create_account(
+            samdb,
+            mach_name,
+            account_type=self.AccountType.COMPUTER,
+            spn="%s/%s" % (service,
+                           mach_name))
 
         # Talk to the KDC to obtain the service ticket, which gets placed into
         # the cache. The machine account name has to match the name in the