From: Stefan Metzmacher Date: Wed, 5 Feb 2025 08:15:47 +0000 (+0100) Subject: python:tests/krb5: let create_trust() take {ingress,egress}_claims_tf_rules X-Git-Tag: tevent-0.17.0~622 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2dba2a31c27fc8779bc27d4095eca1948831b04d;p=thirdparty%2Fsamba.git python:tests/krb5: let create_trust() take {ingress,egress}_claims_tf_rules Signed-off-by: Stefan Metzmacher Reviewed-by: Ralph Boehme Autobuild-User(master): Ralph Böhme Autobuild-Date(master): Mon Feb 24 10:28:02 UTC 2025 on atb-devel-224 --- diff --git a/python/samba/tests/krb5/kdc_base_test.py b/python/samba/tests/krb5/kdc_base_test.py index 8edd7dff460..20a004579a4 100644 --- a/python/samba/tests/krb5/kdc_base_test.py +++ b/python/samba/tests/krb5/kdc_base_test.py @@ -103,6 +103,10 @@ from samba.join import DCJoinContext from samba.ndr import ndr_pack, ndr_unpack from samba.param import LoadParm from samba.samdb import SamDB, dsdb_Dn +from samba.security import ( + claims_tf_policy_parse_rules, + claims_tf_policy_wrap_xml, +) rc4_bit = security.KERB_ENCTYPE_RC4_HMAC_MD5 aes256_sk_bit = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK @@ -206,6 +210,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): cls.ldb_cleanups = [] cls._claim_types_dn = None + cls._claim_tf_policies_dn = None cls._authn_policy_config_dn = None cls._authn_policies_dn = None cls._authn_silos_dn = None @@ -250,6 +255,46 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): # Return a copy of the DN. return ldb.Dn(samdb, str(self._claim_types_dn)) + def get_claim_tf_policies_dn(self): + samdb = self.get_samdb() + + if self._claim_tf_policies_dn is None: + claim_config_dn = samdb.get_config_basedn() + + claim_config_dn.add_child('CN=Claims Configuration,CN=Services') + details = { + 'dn': claim_config_dn, + 'objectClass': 'container', + } + try: + samdb.add(details) + except ldb.LdbError as err: + num, _ = err.args + if num != ldb.ERR_ENTRY_ALREADY_EXISTS: + raise + else: + self.accounts.append(str(claim_config_dn)) + + claim_tf_policies_dn = claim_config_dn + claim_tf_policies_dn.add_child('CN=Claims Transformation Policies') + details = { + 'dn': claim_tf_policies_dn, + 'objectClass': 'msDS-ClaimsTransformationPolicies', + } + try: + samdb.add(details) + except ldb.LdbError as err: + num, _ = err.args + if num != ldb.ERR_ENTRY_ALREADY_EXISTS: + raise + else: + self.accounts.append(str(claim_tf_policies_dn)) + + type(self)._claim_tf_policies_dn = claim_tf_policies_dn + + # Return a copy of the DN. + return ldb.Dn(samdb, str(self._claim_tf_policies_dn)) + def get_authn_policy_config_dn(self): samdb = self.get_samdb() @@ -912,6 +957,8 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): forest_info=None, trust_incoming_password=None, trust_outgoing_password=None, + ingress_claims_tf_rules=None, + egress_claims_tf_rules=None, expect_error=None, preserve=True): """Create an trust account for testing. @@ -1011,6 +1058,42 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): samdb = self.get_samdb() + def claims_tf_policy_dn(name, rules): + xml_rules = claims_tf_policy_wrap_xml(rules) + rs = claims_tf_policy_parse_rules(xml_rules, strip_xml=True) + policy_dn = self.get_claim_tf_policies_dn() + policy_dn.add_child('CN=%s' % name) + details = { + 'dn': policy_dn, + 'objectClass': 'msDS-ClaimsTransformationPolicyType', + 'msDS-TransformationRules': xml_rules, + } + try: + samdb.add(details) + except ldb.LdbError as err: + num, _ = err.args + if num != ldb.ERR_ENTRY_ALREADY_EXISTS: + raise + raise + else: + self.accounts.append(str(policy_dn)) + + return policy_dn + + if ingress_claims_tf_rules is not None: + ingress_policy_dn = claims_tf_policy_dn("%s-Ingress" % + trust_dns_name.string, + ingress_claims_tf_rules) + else: + ingress_policy_dn = None + + if egress_claims_tf_rules is not None: + egress_policy_dn = claims_tf_policy_dn("%s-Egress" % + trust_dns_name.string, + egress_claims_tf_rules) + else: + egress_policy_dn = None + incoming_account_name = trust_info.netbios_name.string incoming_account_name += '$' incoming_nbt_domain = local_info.name.string @@ -1029,6 +1112,22 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest): self.assertEqual(len(tdo_res), 1) tdo_dn = tdo_res[0].dn + if ingress_policy_dn or egress_policy_dn: + msg = ldb.Message() + msg.dn = tdo_dn + if ingress_policy_dn: + msg['ingress'] = ldb.MessageElement( + str(ingress_policy_dn), + ldb.FLAG_MOD_REPLACE, + 'msDS-IngressClaimsTransformationPolicy') + if egress_policy_dn: + msg['egress'] = ldb.MessageElement( + str(egress_policy_dn), + ldb.FLAG_MOD_REPLACE, + 'msDS-EgressClaimsTransformationPolicy') + + samdb.modify(msg) + acct_search_filter = "(&(objectClass=user)(sAMAccountName=%s))" % ( incoming_account_name) acct_res = samdb.search(scope=ldb.SCOPE_SUBTREE,