From: Joseph Sutton Date: Tue, 13 Jun 2023 23:12:15 +0000 (+1200) Subject: tests/krb5: Cache created authentication policies X-Git-Tag: talloc-2.4.1~385 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9d8ee6a422277da8145ca30cd76c9e74263f0b14;p=thirdparty%2Fsamba.git tests/krb5: Cache created authentication policies View with ‘git show -b’. Signed-off-by: Joseph Sutton Reviewed-by: Andrew Bartlett --- diff --git a/python/samba/tests/krb5/claims_tests.py b/python/samba/tests/krb5/claims_tests.py index 0144e961818..66a62303183 100755 --- a/python/samba/tests/krb5/claims_tests.py +++ b/python/samba/tests/krb5/claims_tests.py @@ -716,10 +716,6 @@ class ClaimsTests(KDCBaseTest): etypes=etypes) self.check_reply(rep, KRB_TGS_REP) - @staticmethod - def freeze(m): - return frozenset((k, v) for k, v in m.items()) - @classmethod def setUpDynamicTestCases(cls): FILTER = env_get_var_value('FILTER', allow_missing=True) diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py index 26b2dfbe655..e2d328d1dc7 100644 --- a/python/samba/tests/krb5/kdc_base_test.py +++ b/python/samba/tests/krb5/kdc_base_test.py @@ -177,6 +177,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): cls.accounts = [] cls.account_cache = {} + cls.policy_cache = {} cls.tkt_cache = {} cls._rodc_ctx = None @@ -306,6 +307,10 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): # Return a copy of the DN. return ldb.Dn(samdb, str(self._authn_silos_dn)) + @staticmethod + def freeze(m): + return frozenset((k, v) for k, v in m.items()) + def tearDown(self): # Run any cleanups that may modify accounts prior to deleting those # accounts. @@ -577,20 +582,41 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): return claim_id - def create_authn_policy(self, - policy_id, - enforced=None, - strong_ntlm_policy=None, - user_allowed_from=None, - user_allowed_ntlm=None, - user_allowed_to=None, - user_tgt_lifetime=None, - computer_allowed_to=None, - computer_tgt_lifetime=None, - service_allowed_from=None, - service_allowed_ntlm=None, - service_allowed_to=None, - service_tgt_lifetime=None): + def create_authn_policy(self, *args, + use_cache=True, + **kwargs): + + if use_cache: + cache_key = self.freeze({ + args: None, + **kwargs, + }) + + authn_policy = self.policy_cache.get(cache_key) + if authn_policy is not None: + return authn_policy + + authn_policy = self.create_authn_policy_opts(*args, **kwargs) + if use_cache: + self.policy_cache[cache_key] = authn_policy + + return authn_policy + + def create_authn_policy_opts(self, + policy_id, + *, + enforced=None, + strong_ntlm_policy=None, + user_allowed_from=None, + user_allowed_ntlm=None, + user_allowed_to=None, + user_tgt_lifetime=None, + computer_allowed_to=None, + computer_tgt_lifetime=None, + service_allowed_from=None, + service_allowed_ntlm=None, + service_allowed_to=None, + service_tgt_lifetime=None): samdb = self.get_samdb() policy_dn = self.get_authn_policies_dn()