]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
tests/krb5: Cache created authentication policies
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Tue, 13 Jun 2023 23:12:15 +0000 (11:12 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 14 Jun 2023 22:57:35 +0000 (22:57 +0000)
View with ‘git show -b’.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/krb5/claims_tests.py
python/samba/tests/krb5/kdc_base_test.py

index 0144e961818e3d64c062b548d6692dc7aa02dc5b..66a62303183c31d6681584db30ea532394569d64 100755 (executable)
@@ -716,10 +716,6 @@ class ClaimsTests(KDCBaseTest):
                                          etypes=etypes)
         self.check_reply(rep, KRB_TGS_REP)
 
-    @staticmethod
-    def freeze(m):
-        return frozenset((k, v) for k, v in m.items())
-
     @classmethod
     def setUpDynamicTestCases(cls):
         FILTER = env_get_var_value('FILTER', allow_missing=True)
index 26b2dfbe655260e6cc07b5be0f07ff4b2ae8260c..e2d328d1dc705505fb1350a4dc67d2b0fac7495b 100644 (file)
@@ -177,6 +177,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         cls.accounts = []
 
         cls.account_cache = {}
+        cls.policy_cache = {}
         cls.tkt_cache = {}
 
         cls._rodc_ctx = None
@@ -306,6 +307,10 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         # Return a copy of the DN.
         return ldb.Dn(samdb, str(self._authn_silos_dn))
 
+    @staticmethod
+    def freeze(m):
+        return frozenset((k, v) for k, v in m.items())
+
     def tearDown(self):
         # Run any cleanups that may modify accounts prior to deleting those
         # accounts.
@@ -577,20 +582,41 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
 
         return claim_id
 
-    def create_authn_policy(self,
-                            policy_id,
-                            enforced=None,
-                            strong_ntlm_policy=None,
-                            user_allowed_from=None,
-                            user_allowed_ntlm=None,
-                            user_allowed_to=None,
-                            user_tgt_lifetime=None,
-                            computer_allowed_to=None,
-                            computer_tgt_lifetime=None,
-                            service_allowed_from=None,
-                            service_allowed_ntlm=None,
-                            service_allowed_to=None,
-                            service_tgt_lifetime=None):
+    def create_authn_policy(self, *args,
+                            use_cache=True,
+                            **kwargs):
+
+        if use_cache:
+            cache_key = self.freeze({
+                args: None,
+                **kwargs,
+            })
+
+            authn_policy = self.policy_cache.get(cache_key)
+            if authn_policy is not None:
+                return authn_policy
+
+        authn_policy = self.create_authn_policy_opts(*args, **kwargs)
+        if use_cache:
+            self.policy_cache[cache_key] = authn_policy
+
+        return authn_policy
+
+    def create_authn_policy_opts(self,
+                                 policy_id,
+                                 *,
+                                 enforced=None,
+                                 strong_ntlm_policy=None,
+                                 user_allowed_from=None,
+                                 user_allowed_ntlm=None,
+                                 user_allowed_to=None,
+                                 user_tgt_lifetime=None,
+                                 computer_allowed_to=None,
+                                 computer_tgt_lifetime=None,
+                                 service_allowed_from=None,
+                                 service_allowed_ntlm=None,
+                                 service_allowed_to=None,
+                                 service_tgt_lifetime=None):
         samdb = self.get_samdb()
 
         policy_dn = self.get_authn_policies_dn()