cls.accounts = []
cls.account_cache = {}
+ cls.policy_cache = {}
cls.tkt_cache = {}
cls._rodc_ctx = None
# Return a copy of the DN.
return ldb.Dn(samdb, str(self._authn_silos_dn))
+ @staticmethod
+ def freeze(m):
+ return frozenset((k, v) for k, v in m.items())
+
def tearDown(self):
# Run any cleanups that may modify accounts prior to deleting those
# accounts.
return claim_id
- def create_authn_policy(self,
- policy_id,
- enforced=None,
- strong_ntlm_policy=None,
- user_allowed_from=None,
- user_allowed_ntlm=None,
- user_allowed_to=None,
- user_tgt_lifetime=None,
- computer_allowed_to=None,
- computer_tgt_lifetime=None,
- service_allowed_from=None,
- service_allowed_ntlm=None,
- service_allowed_to=None,
- service_tgt_lifetime=None):
+ def create_authn_policy(self, *args,
+ use_cache=True,
+ **kwargs):
+
+ if use_cache:
+ cache_key = self.freeze({
+ args: None,
+ **kwargs,
+ })
+
+ authn_policy = self.policy_cache.get(cache_key)
+ if authn_policy is not None:
+ return authn_policy
+
+ authn_policy = self.create_authn_policy_opts(*args, **kwargs)
+ if use_cache:
+ self.policy_cache[cache_key] = authn_policy
+
+ return authn_policy
+
+ def create_authn_policy_opts(self,
+ policy_id,
+ *,
+ enforced=None,
+ strong_ntlm_policy=None,
+ user_allowed_from=None,
+ user_allowed_ntlm=None,
+ user_allowed_to=None,
+ user_tgt_lifetime=None,
+ computer_allowed_to=None,
+ computer_tgt_lifetime=None,
+ service_allowed_from=None,
+ service_allowed_ntlm=None,
+ service_allowed_to=None,
+ service_tgt_lifetime=None):
samdb = self.get_samdb()
policy_dn = self.get_authn_policies_dn()