from samba.ndr import ndr_pack, ndr_unpack
from samba.param import LoadParm
from samba.samdb import SamDB, dsdb_Dn
+from samba.security import (
+ claims_tf_policy_parse_rules,
+ claims_tf_policy_wrap_xml,
+)
rc4_bit = security.KERB_ENCTYPE_RC4_HMAC_MD5
aes256_sk_bit = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK
cls.ldb_cleanups = []
cls._claim_types_dn = None
+ cls._claim_tf_policies_dn = None
cls._authn_policy_config_dn = None
cls._authn_policies_dn = None
cls._authn_silos_dn = None
# Return a copy of the DN.
return ldb.Dn(samdb, str(self._claim_types_dn))
+ def get_claim_tf_policies_dn(self):
+ samdb = self.get_samdb()
+
+ if self._claim_tf_policies_dn is None:
+ claim_config_dn = samdb.get_config_basedn()
+
+ claim_config_dn.add_child('CN=Claims Configuration,CN=Services')
+ details = {
+ 'dn': claim_config_dn,
+ 'objectClass': 'container',
+ }
+ try:
+ samdb.add(details)
+ except ldb.LdbError as err:
+ num, _ = err.args
+ if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
+ raise
+ else:
+ self.accounts.append(str(claim_config_dn))
+
+ claim_tf_policies_dn = claim_config_dn
+ claim_tf_policies_dn.add_child('CN=Claims Transformation Policies')
+ details = {
+ 'dn': claim_tf_policies_dn,
+ 'objectClass': 'msDS-ClaimsTransformationPolicies',
+ }
+ try:
+ samdb.add(details)
+ except ldb.LdbError as err:
+ num, _ = err.args
+ if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
+ raise
+ else:
+ self.accounts.append(str(claim_tf_policies_dn))
+
+ type(self)._claim_tf_policies_dn = claim_tf_policies_dn
+
+ # Return a copy of the DN.
+ return ldb.Dn(samdb, str(self._claim_tf_policies_dn))
+
def get_authn_policy_config_dn(self):
samdb = self.get_samdb()
forest_info=None,
trust_incoming_password=None,
trust_outgoing_password=None,
+ ingress_claims_tf_rules=None,
+ egress_claims_tf_rules=None,
expect_error=None,
preserve=True):
"""Create an trust account for testing.
samdb = self.get_samdb()
+ def claims_tf_policy_dn(name, rules):
+ xml_rules = claims_tf_policy_wrap_xml(rules)
+ rs = claims_tf_policy_parse_rules(xml_rules, strip_xml=True)
+ policy_dn = self.get_claim_tf_policies_dn()
+ policy_dn.add_child('CN=%s' % name)
+ details = {
+ 'dn': policy_dn,
+ 'objectClass': 'msDS-ClaimsTransformationPolicyType',
+ 'msDS-TransformationRules': xml_rules,
+ }
+ try:
+ samdb.add(details)
+ except ldb.LdbError as err:
+ num, _ = err.args
+ if num != ldb.ERR_ENTRY_ALREADY_EXISTS:
+ raise
+ raise
+ else:
+ self.accounts.append(str(policy_dn))
+
+ return policy_dn
+
+ if ingress_claims_tf_rules is not None:
+ ingress_policy_dn = claims_tf_policy_dn("%s-Ingress" %
+ trust_dns_name.string,
+ ingress_claims_tf_rules)
+ else:
+ ingress_policy_dn = None
+
+ if egress_claims_tf_rules is not None:
+ egress_policy_dn = claims_tf_policy_dn("%s-Egress" %
+ trust_dns_name.string,
+ egress_claims_tf_rules)
+ else:
+ egress_policy_dn = None
+
incoming_account_name = trust_info.netbios_name.string
incoming_account_name += '$'
incoming_nbt_domain = local_info.name.string
self.assertEqual(len(tdo_res), 1)
tdo_dn = tdo_res[0].dn
+ if ingress_policy_dn or egress_policy_dn:
+ msg = ldb.Message()
+ msg.dn = tdo_dn
+ if ingress_policy_dn:
+ msg['ingress'] = ldb.MessageElement(
+ str(ingress_policy_dn),
+ ldb.FLAG_MOD_REPLACE,
+ 'msDS-IngressClaimsTransformationPolicy')
+ if egress_policy_dn:
+ msg['egress'] = ldb.MessageElement(
+ str(egress_policy_dn),
+ ldb.FLAG_MOD_REPLACE,
+ 'msDS-EgressClaimsTransformationPolicy')
+
+ samdb.modify(msg)
+
acct_search_filter = "(&(objectClass=user)(sAMAccountName=%s))" % (
incoming_account_name)
acct_res = samdb.search(scope=ldb.SCOPE_SUBTREE,