From: Jennifer Sutton Date: Wed, 20 Aug 2025 07:10:43 +0000 (+1200) Subject: s4:dsdb:tests: Add tests for msDS-KeyCredentialLink attribute X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;p=thirdparty%2Fsamba.git s4:dsdb:tests: Add tests for msDS-KeyCredentialLink attribute Signed-off-by: Jennifer Sutton Reviewed-by: Douglas Bagnall Autobuild-User(master): Douglas Bagnall Autobuild-Date(master): Wed Aug 27 04:44:59 UTC 2025 on atb-devel-224 --- diff --git a/source4/dsdb/tests/python/key_credential_link.py b/source4/dsdb/tests/python/key_credential_link.py new file mode 100755 index 00000000000..5afa54d05f0 --- /dev/null +++ b/source4/dsdb/tests/python/key_credential_link.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# These are unit tests for LDAP access checks + +import optparse +import sys + +sys.path.insert(0, "bin/python") +import samba + +from typing import List, Optional + +from samba.tests.subunitrun import SubunitOptions, TestProgram + +import samba.getopt as options + +from ldb import ( + LdbError, + ERR_INSUFFICIENT_ACCESS_RIGHTS, +) +from ldb import ERR_CONSTRAINT_VIOLATION +from ldb import Message, MessageElement, Dn +from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE +from samba.dcerpc import security + +from samba.auth import system_session +from samba import gensec, key_credential_link, sd_utils +from samba.samdb import BinaryDn, SamDB +from samba.credentials import Credentials, DONT_USE_KERBEROS +import samba.tests +from samba.tests import delete_force +import samba.dsdb + +from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat +from cryptography.hazmat.primitives import asymmetric +from cryptography.hazmat.backends import default_backend + +parser = optparse.OptionParser("key_credential_link.py [options] ") +sambaopts = options.SambaOptions(parser) +parser.add_option_group(sambaopts) +parser.add_option_group(options.VersionOptions(parser)) + +# use command line creds if available +credopts = options.CredentialsOptions(parser) +parser.add_option_group(credopts) +subunitopts = SubunitOptions(parser) +parser.add_option_group(subunitopts) + +opts, args = parser.parse_args() + +if len(args) < 1: + parser.print_usage() + sys.exit(1) + +host = args[0] +if "://" not in host: + ldaphost = "ldap://%s" % host +else: + ldaphost = host + start = host.rindex("://") + host = host.lstrip(start + 3) + +lp = sambaopts.get_loadparm() +creds = credopts.get_credentials(lp) +creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + +# +# Tests start here +# + + +class AclTests(samba.tests.TestCase): + def setUp(self): + super(AclTests, self).setUp() + + strict_checking = samba.tests.env_get_var_value( + "STRICT_CHECKING", allow_missing=True + ) + if strict_checking is None: + strict_checking = "1" + self.strict_checking = bool(int(strict_checking)) + + self.ldb_admin = SamDB( + ldaphost, credentials=creds, session_info=system_session(lp), lp=lp + ) + self.base_dn = self.ldb_admin.domain_dn() + self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid()) + self.user_pass = "samba123@" + self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized() + self.sd_utils = sd_utils.SDUtils(self.ldb_admin) + self.addCleanup(self.delete_admin_connection) + + # set AttributeAuthorizationOnLDAPAdd and BlockOwnerImplicitRights + self.set_heuristic(samba.dsdb.DS_HR_ATTR_AUTHZ_ON_LDAP_ADD, b"11") + + def set_heuristic(self, index, values): + self.assertGreater(index, 0) + self.assertLess(index, 30) + self.assertIsInstance(values, bytes) + + # Get the old "dSHeuristics" if it was set + dsheuristics = self.ldb_admin.get_dsheuristics() + # Reset the "dSHeuristics" as they were before + self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics) + # Set the "dSHeuristics" to activate the correct behaviour + default_heuristics = b"000000000100000000020000000003" + if dsheuristics is None: + dsheuristics = b"" + dsheuristics += default_heuristics[len(dsheuristics) :] + dsheuristics = ( + dsheuristics[: index - 1] + values + dsheuristics[index - 1 + len(values) :] + ) + self.ldb_admin.set_dsheuristics(dsheuristics) + + def get_user_dn(self, name): + return "CN=%s,CN=Users,%s" % (name, self.base_dn) + + def get_computer_dn(self, name): + return f"CN={name},CN=Computers,{self.base_dn}" + + def get_creds(self, target_username, target_password): + creds_tmp = Credentials() + creds_tmp.set_username(target_username) + creds_tmp.set_password(target_password) + creds_tmp.set_domain(creds.get_domain()) + creds_tmp.set_realm(creds.get_realm()) + creds_tmp.set_workstation(creds.get_workstation()) + creds_tmp.set_gensec_features( + creds_tmp.get_gensec_features() | gensec.FEATURE_SEAL + ) + creds_tmp.set_kerberos_state( + DONT_USE_KERBEROS + ) # kinit is too expensive to use in a tight loop + return creds_tmp + + def get_ldb_connection(self, target_username, target_password): + creds_tmp = self.get_creds(target_username, target_password) + ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp) + return ldb_target + + def delete_admin_connection(self): + del self.sd_utils + del self.ldb_admin + + +class AclKeyCredentialLinkTests(AclTests): + def setUp(self): + super().setUp() + self.user = "acl_key_cred_user" + self.computer = "acl_key_cred_comp" + self.user_dn = self.get_user_dn(self.user) + self.computer_dn = self.get_computer_dn(self.computer) + delete_force(self.ldb_admin, self.user_dn) + delete_force(self.ldb_admin, self.computer_dn) + self.ldb_admin.newuser(self.user, self.user_pass) + self.ldb_admin.newcomputer(self.computer) + self.ldb_admin.setpassword(f"sAMAccountName={self.computer}$", self.user_pass) + self.user_creds = self.get_creds(self.user, self.user_pass) + self.ldb_user = self.get_ldb_connection(self.user, self.user_pass) + self.ldb_computer = self.get_ldb_connection(f"{self.computer}$", self.user_pass) + + def tearDown(self): + super().tearDown() + delete_force(self.ldb_admin, self.user_dn) + delete_force(self.ldb_admin, self.computer_dn) + + def create_key_credential_link_value(self, target_dn): + private_key = asymmetric.rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + public_key = private_key.public_key() + + public_data = public_key.public_bytes( + encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo + ) + + return key_credential_link.create_key_credential_link( + self.ldb_admin, target_dn, public_data + ) + + def test_can_add_with_write_property(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_write=True, + ) + + def test_cannot_add_without_write_property_or_validated_write(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + expect_error=ERR_INSUFFICIENT_ACCESS_RIGHTS, + ) + + def test_can_delete_with_validated_write(self): + target_dn = self.computer_dn + target_sid = self.sd_utils.get_object_sid(target_dn) + + msg = Message(Dn(self.ldb_admin, target_dn)) + msg["msDS-KeyCredentialLink"] = MessageElement( + [ + self.create_key_credential_link_value(self.computer_dn) + .get_linearized() + .encode() + ], + FLAG_MOD_ADD, + "msDS-KeyCredentialLink", + ) + self.ldb_admin.modify(msg) + + mod = f"(OD;;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_MS_DS_KEY_CREDENTIAL_LINK};;{target_sid})" + self.sd_utils.dacl_add_ace(target_dn, mod) + + mod = f"(OA;;SW;{security.GUID_DRS_DS_VALIDATED_WRITE_COMPUTER};;{target_sid})" + self.sd_utils.dacl_add_ace(target_dn, mod) + + msg = Message(Dn(self.ldb_admin, target_dn)) + msg["msDS-KeyCredentialLink"] = MessageElement( + [], + FLAG_MOD_DELETE, + "msDS-KeyCredentialLink", + ) + self.ldb_computer.modify(msg) + + def test_cannot_delete_without_write_property_or_validated_write(self): + target_dn = self.computer_dn + target_sid = self.sd_utils.get_object_sid(target_dn) + + msg = Message(Dn(self.ldb_admin, target_dn)) + msg["msDS-KeyCredentialLink"] = MessageElement( + [ + self.create_key_credential_link_value(self.computer_dn) + .get_linearized() + .encode() + ], + FLAG_MOD_ADD, + "msDS-KeyCredentialLink", + ) + self.ldb_admin.modify(msg) + + mod = f"(OD;;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_MS_DS_KEY_CREDENTIAL_LINK};;{target_sid})" + self.sd_utils.dacl_add_ace(target_dn, mod) + + mod = f"(OD;;SW;{security.GUID_DRS_DS_VALIDATED_WRITE_COMPUTER};;{target_sid})" + self.sd_utils.dacl_add_ace(target_dn, mod) + + msg = Message(Dn(self.ldb_admin, target_dn)) + msg["msDS-KeyCredentialLink"] = MessageElement( + [], + FLAG_MOD_DELETE, + "msDS-KeyCredentialLink", + ) + try: + self.ldb_computer.modify(msg) + except LdbError as err: + self.assertEqual(ERR_INSUFFICIENT_ACCESS_RIGHTS, err.args[0]) + else: + self.fail("expected to fail") + + def test_can_add_to_computer_with_validated_write(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_validated_write=True, + ) + + def test_cannot_add_to_non_computer_with_validated_write(self): + self._test_key_cred_link( + self.ldb_user, + self.user_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_validated_write=True, + expect_error=ERR_INSUFFICIENT_ACCESS_RIGHTS, + ) + + def test_can_add_to_non_computer_with_write_property(self): + self._test_key_cred_link( + self.ldb_user, + self.user_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_write=True, + ) + + def test_can_add_multiple_values_with_write_property(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [ + self.create_key_credential_link_value(self.computer_dn), + self.create_key_credential_link_value(self.computer_dn), + ], + allow_write=True, + ) + + def test_cannot_add_multiple_values_with_validated_write(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [ + self.create_key_credential_link_value(self.computer_dn), + self.create_key_credential_link_value(self.computer_dn), + ], + allow_validated_write=True, + expect_error=ERR_INSUFFICIENT_ACCESS_RIGHTS, + ) + + def test_can_replace_no_values_with_validated_write(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [], + allow_validated_write=True, + replace_existing=True, + ) + + def test_cannot_add_no_values(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [], + allow_write=True, + expect_error=ERR_CONSTRAINT_VIOLATION, + ) + + def test_can_add_to_existing_value_with_write_property(self): + for _ in range(2): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_write=True, + ) + + def test_cannot_add_to_existing_value_with_validated_write(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_validated_write=True, + ) + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_validated_write=True, + expect_error=ERR_INSUFFICIENT_ACCESS_RIGHTS, + ) + + def test_can_replace_existing_value_with_write_property(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_write=True, + ) + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_write=True, + replace_existing=True, + ) + + def test_cannot_replace_existing_value_with_validated_write(self): + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_validated_write=True, + ) + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + allow_validated_write=True, + replace_existing=True, + expect_error=ERR_INSUFFICIENT_ACCESS_RIGHTS, + ) + + def test_can_add_malformed_value_with_write_property(self): + key_cred_link = BinaryDn.from_bytes_and_dn( + self.ldb_admin, b"foo bar baz", self.computer_dn + ) + + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [key_cred_link], + allow_write=True, + ) + + def test_cannot_add_malformed_value_with_validated_write(self): + key_cred_link = BinaryDn.from_bytes_and_dn( + self.ldb_admin, b"foo bar baz", self.computer_dn + ) + + self._test_key_cred_link( + self.ldb_computer, + self.computer_dn, + [key_cred_link], + allow_validated_write=True, + expect_error=ERR_INSUFFICIENT_ACCESS_RIGHTS, + ) + + def test_can_add_to_other_with_write_property(self): + self._test_key_cred_link( + self.ldb_user, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + user_sid=self.sd_utils.get_object_sid(self.user_dn), + allow_write=True, + ) + + def test_cannot_add_to_other_with_validated_write(self): + self._test_key_cred_link( + self.ldb_user, + self.computer_dn, + [self.create_key_credential_link_value(self.computer_dn)], + user_sid=self.sd_utils.get_object_sid(self.user_dn), + allow_validated_write=True, + expect_error=ERR_INSUFFICIENT_ACCESS_RIGHTS, + ) + + def _test_key_cred_link( + self, + samdb: SamDB, + target_dn: str, + key_cred_link_dns: List[key_credential_link.KeyCredentialLinkDn], + *, + user_sid: Optional[security.dom_sid] = None, + allow_write: bool = False, + allow_validated_write: bool = False, + replace_existing: bool = False, + expect_error: int = 0, + ): + if user_sid is None: + user_sid = self.sd_utils.get_object_sid(target_dn) + + mod = f"(O{'A' if allow_write else 'D'};;WP;{samba.dsdb.DS_GUID_SCHEMA_ATTR_MS_DS_KEY_CREDENTIAL_LINK};;{user_sid})" + self.sd_utils.dacl_add_ace(target_dn, mod) + + # Note: SELF and OWNER have GUID_DRS_DS_VALIDATED_WRITE_COMPUTER by + # default. + mod = f"(O{'A' if allow_validated_write else 'D'};;SW;{security.GUID_DRS_DS_VALIDATED_WRITE_COMPUTER};;{user_sid})" + self.sd_utils.dacl_add_ace(target_dn, mod) + + key_cred_links = [dn.get_linearized().encode() for dn in key_cred_link_dns] + + msg = Message(Dn(samdb, target_dn)) + msg["msDS-KeyCredentialLink"] = MessageElement( + key_cred_links, + FLAG_MOD_REPLACE if replace_existing else FLAG_MOD_ADD, + "msDS-KeyCredentialLink", + ) + try: + samdb.modify(msg) + except LdbError as err: + if not expect_error: + self.fail("got unexpected error") + + self.assertEqual(expect_error, err.args[0]) + else: + if expect_error: + self.fail(f"expected to fail with error code {expect_error}") + + +# Important unit running information + +ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp) + +TestProgram(module=__name__, opts=subunitopts) diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index 13d868c6873..2959a2c35b7 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -1718,6 +1718,8 @@ for env in all_fl_envs + ["schema_dc"]: plantestsuite_loadlist("samba4.ldap.acl.python(%s)" % env, env, ["STRICT_CHECKING=0", python, os.path.join(DSDB_PYTEST_DIR, "acl.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) plantestsuite_loadlist("samba4.ldap.acl_modify.python(%s)" % env, env, ["STRICT_CHECKING=0", python, os.path.join(DSDB_PYTEST_DIR, "acl_modify.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) +plantestsuite_loadlist("samba4.ldap.key_credential_link.python", "schema_dc", ["STRICT_CHECKING=0", python, os.path.join(DSDB_PYTEST_DIR, "key_credential_link.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) + for env in all_fl_envs + ["schema_dc", "ad_dc_no_ntlm"]: if env != "fl2000dc": # This test makes excessive use of the "userPassword" attribute which