]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
python:tests/krb5: let create_trust() take {ingress,egress}_claims_tf_rules
authorStefan Metzmacher <metze@samba.org>
Wed, 5 Feb 2025 08:15:47 +0000 (09:15 +0100)
committerRalph Boehme <slow@samba.org>
Mon, 24 Feb 2025 10:28:02 +0000 (10:28 +0000)
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
Autobuild-User(master): Ralph Böhme <slow@samba.org>
Autobuild-Date(master): Mon Feb 24 10:28:02 UTC 2025 on atb-devel-224

python/samba/tests/krb5/kdc_base_test.py

index 8edd7dff460485c1473fb9f07f19c080b9fb05eb..20a004579a4bef6882d6c5d8dc17272f0e4f6ac7 100644 (file)
@@ -103,6 +103,10 @@ from samba.join import DCJoinContext
 from samba.ndr import ndr_pack, ndr_unpack
 from samba.param import LoadParm
 from samba.samdb import SamDB, dsdb_Dn
+from samba.security import (
+    claims_tf_policy_parse_rules,
+    claims_tf_policy_wrap_xml,
+)
 
 rc4_bit = security.KERB_ENCTYPE_RC4_HMAC_MD5
 aes256_sk_bit = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
@@ -206,6 +210,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         cls.ldb_cleanups = []
 
         cls._claim_types_dn = None
+        cls._claim_tf_policies_dn = None
         cls._authn_policy_config_dn = None
         cls._authn_policies_dn = None
         cls._authn_silos_dn = None
@@ -250,6 +255,46 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         # Return a copy of the DN.
         return ldb.Dn(samdb, str(self._claim_types_dn))
 
+    def get_claim_tf_policies_dn(self):
+        samdb = self.get_samdb()
+
+        if self._claim_tf_policies_dn is None:
+            claim_config_dn = samdb.get_config_basedn()
+
+            claim_config_dn.add_child('CN=Claims Configuration,CN=Services')
+            details = {
+                'dn': claim_config_dn,
+                'objectClass': 'container',
+            }
+            try:
+                samdb.add(details)
+            except ldb.LdbError as err:
+                num, _ = err.args
+                if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
+                    raise
+            else:
+                self.accounts.append(str(claim_config_dn))
+
+            claim_tf_policies_dn = claim_config_dn
+            claim_tf_policies_dn.add_child('CN=Claims Transformation Policies')
+            details = {
+                'dn': claim_tf_policies_dn,
+                'objectClass': 'msDS-ClaimsTransformationPolicies',
+            }
+            try:
+                samdb.add(details)
+            except ldb.LdbError as err:
+                num, _ = err.args
+                if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
+                    raise
+            else:
+                self.accounts.append(str(claim_tf_policies_dn))
+
+            type(self)._claim_tf_policies_dn = claim_tf_policies_dn
+
+        # Return a copy of the DN.
+        return ldb.Dn(samdb, str(self._claim_tf_policies_dn))
+
     def get_authn_policy_config_dn(self):
         samdb = self.get_samdb()
 
@@ -912,6 +957,8 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
                      forest_info=None,
                      trust_incoming_password=None,
                      trust_outgoing_password=None,
+                     ingress_claims_tf_rules=None,
+                     egress_claims_tf_rules=None,
                      expect_error=None,
                      preserve=True):
         """Create an trust account for testing.
@@ -1011,6 +1058,42 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
 
         samdb = self.get_samdb()
 
+        def claims_tf_policy_dn(name, rules):
+            xml_rules = claims_tf_policy_wrap_xml(rules)
+            rs = claims_tf_policy_parse_rules(xml_rules, strip_xml=True)
+            policy_dn = self.get_claim_tf_policies_dn()
+            policy_dn.add_child('CN=%s' % name)
+            details = {
+                'dn': policy_dn,
+                'objectClass': 'msDS-ClaimsTransformationPolicyType',
+                'msDS-TransformationRules': xml_rules,
+            }
+            try:
+                samdb.add(details)
+            except ldb.LdbError as err:
+                num, _ = err.args
+                if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
+                    raise
+                raise
+            else:
+                self.accounts.append(str(policy_dn))
+
+            return policy_dn
+
+        if ingress_claims_tf_rules is not None:
+            ingress_policy_dn = claims_tf_policy_dn("%s-Ingress" %
+                                                    trust_dns_name.string,
+                                                    ingress_claims_tf_rules)
+        else:
+            ingress_policy_dn = None
+
+        if egress_claims_tf_rules is not None:
+            egress_policy_dn = claims_tf_policy_dn("%s-Egress" %
+                                                   trust_dns_name.string,
+                                                   egress_claims_tf_rules)
+        else:
+            egress_policy_dn = None
+
         incoming_account_name = trust_info.netbios_name.string
         incoming_account_name += '$'
         incoming_nbt_domain = local_info.name.string
@@ -1029,6 +1112,22 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
         self.assertEqual(len(tdo_res), 1)
         tdo_dn = tdo_res[0].dn
 
+        if ingress_policy_dn or egress_policy_dn:
+            msg = ldb.Message()
+            msg.dn = tdo_dn
+            if ingress_policy_dn:
+                msg['ingress'] = ldb.MessageElement(
+                    str(ingress_policy_dn),
+                    ldb.FLAG_MOD_REPLACE,
+                    'msDS-IngressClaimsTransformationPolicy')
+            if egress_policy_dn:
+                msg['egress'] = ldb.MessageElement(
+                    str(egress_policy_dn),
+                    ldb.FLAG_MOD_REPLACE,
+                    'msDS-EgressClaimsTransformationPolicy')
+
+            samdb.modify(msg)
+
         acct_search_filter = "(&(objectClass=user)(sAMAccountName=%s))" % (
                              incoming_account_name)
         acct_res = samdb.search(scope=ldb.SCOPE_SUBTREE,