#
from samba import provision, param
import os
+import random
import shutil
import subprocess
from samba.tests import (env_loadparm, create_test_ou, BlackboxProcessError,
- BlackboxTestCase, connect_samdb)
+ BlackboxTestCase, connect_samdb, delete_force)
+from samba.tests.gkdi import GkdiBaseTest
import ldb
from samba.samdb import SamDB
from samba.auth import system_session
from samba.netcmd.fsmo import get_fsmo_roleowner
import re
from samba import sites
-from samba.dsdb import _dsdb_load_udv_v2
+from samba.dsdb import (
+ _dsdb_load_udv_v2,
+ DS_GUID_DOMAIN_CONTROLLERS_CONTAINER,
+)
+from samba import dsdb
from samba import safe_tarfile as tarfile
scope=ldb.SCOPE_SUBTREE,
expression="(objectClass=kerberosSecret)")
+# A set of attributes we know should be confidential.
+confidential_attrs = frozenset({
+ "msFVE-KeyPackage",
+ "msFVE-RecoveryPassword",
+ "msKds-RootKeyData",
+})
+
+
+def confidential_values_present(msg: ldb.Message) -> bool:
+ return any(map(msg.get, confidential_attrs))
+
+
# The backup tests require that a completely clean LoadParm object gets used
# for the restore. Otherwise the same global LP gets re-used, and the LP
# settings can bleed from one test case to another.
# in a separate process (as opposed to runcmd(), runsubcmd()).
# So although this is a samba-tool test, we don't inherit from SambaToolCmdTest
# so that we never inadvertently use .runcmd() by accident.
-class DomainBackupBase(BlackboxTestCase):
+class DomainBackupBase(BlackboxTestCase, GkdiBaseTest):
def setUp(self):
super().setUp()
backup_file = self.create_backup()
self.restore_backup(backup_file)
lp = self.check_restored_smbconf()
- self.check_restored_database(lp)
+ restored_samdb = self.check_restored_database(lp)
+
+ # Search across naming contexts to ensure we don’t miss any objects.
+ cross_ncs = ["search_options:1:2"]
+
+ # Search the entire restored database for GKDI root keys.
+ restored_root_keys = restored_samdb.search(
+ restored_samdb.get_root_basedn(),
+ expression="objectClass=msKds-ProvRootKey",
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=list(confidential_attrs),
+ controls=cross_ncs,
+ )
+ # Assert that at least one key exists.
+ self.assertLess(0, len(restored_root_keys))
+
+ # Assert that the confidential material is still present.
+ for msg in restored_root_keys:
+ self.assertTrue(confidential_values_present(msg))
def _test_backup_restore_no_secrets(self):
"""Does a backup/restore with secrets excluded from the resulting DB"""
+ samdb: SamDB = self.ldb
+
+ # Use the DC account as the parent of a new msFVE-RecoveryInformation object.
+ mach_dn = samdb.get_wellknown_dn(
+ samdb.get_default_basedn(), DS_GUID_DOMAIN_CONTROLLERS_CONTAINER
+ )
+ mach_dn.add_child(f"CN={self.server}")
+
+ # Get the machine account's GUID.
+ res = samdb.search(mach_dn, scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
+ mach_guid = res[0].get("objectGUID", idx=0)
+
+ # Create a child msFVE-RecoveryInformation object with some confidential
+ # attributes.
+
+ recovery_dn = ldb.Dn(samdb, str(mach_dn))
+ recovery_dn.add_child(f"CN=recovery_info_{random.randint(0, 0xFFFF)}")
+
+ self.addCleanup(delete_force, samdb, recovery_dn)
+ samdb.add({
+ "dn": recovery_dn,
+ "objectClass": "msFVE-RecoveryInformation",
+ "msFVE-RecoveryGuid": mach_guid,
+ "msFVE-KeyPackage": b"secret key package",
+ "msFVE-RecoveryPassword": "Secret007",
+ })
+
+ def ldap_display_name(item: ldb.Message) -> str:
+ return item.get("lDAPDisplayName", idx=0).decode()
+
+ # Look up all the confidential attributes in the schema.
+ res = samdb.search(
+ samdb.get_schema_basedn(),
+ expression=(
+ f"(&(objectClass=attributeSchema)"
+ f"(searchFlags:{ldb.OID_COMPARATOR_AND}"
+ f":={dsdb.SEARCH_FLAG_CONFIDENTIAL}))"
+ ),
+ attrs=["lDAPDisplayName"],
+ )
+ all_confidential_attrs = set(map(ldap_display_name, res))
+
+ # Ensure that the attributes we think are confidential actually are.
+ self.assertLessEqual(confidential_attrs, all_confidential_attrs)
+
+ # Search for GKDI root keys containing secret material.
+ root_key_container_dn = self.get_root_key_container_dn(samdb)
+ root_keys = samdb.search(
+ root_key_container_dn,
+ scope=ldb.SCOPE_ONELEVEL,
+ attrs=list(confidential_attrs),
+ )
+ # Assert that at least one key exists.
+ self.assertLess(0, len(root_keys))
+
+ # Assert that the root keys all have confidential material.
+ for msg in root_keys:
+ self.assertTrue(confidential_values_present(msg))
+
+ # Assert that the recovery information object has confidential material.
+ recovery_info = samdb.search(
+ recovery_dn, scope=ldb.SCOPE_BASE, attrs=list(confidential_attrs)
+ )
+ self.assertTrue(confidential_values_present(recovery_info[0]))
+
+ # We don’t need the original samdb any more.
+ del samdb
+
# exclude secrets when we create the backup
backup_file = self.create_backup(extra_args=["--no-secrets"])
self.restore_backup(backup_file)
lp = self.check_restored_smbconf()
# assert that we don't find user secrets in the DB
- self.check_restored_database(lp, expect_secrets=False)
+ restored_samdb = self.check_restored_database(lp, expect_secrets=False)
+
+ # For the following searches, search across naming contexts to ensure we
+ # don’t miss any objects.
+ cross_ncs = ["search_options:1:2"]
+
+ # Search the entire restored database for GKDI root keys.
+ restored_root_keys = restored_samdb.search(
+ restored_samdb.get_root_basedn(),
+ expression="objectClass=msKds-ProvRootKey",
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=list(confidential_attrs),
+ controls=cross_ncs,
+ )
+ # Assert that no keys exist.
+ self.assertEqual(0, len(restored_root_keys))
+
+ # Search the entire restored database for recovery info objects.
+ recovery_info_objs = restored_samdb.search(
+ restored_samdb.get_root_basedn(),
+ expression="objectClass=msFVE-RecoveryInformation",
+ scope=ldb.SCOPE_SUBTREE,
+ attrs=list(confidential_attrs),
+ controls=cross_ncs,
+ )
+ # Assert that no recovery info exists.
+ self.assertEqual(0, len(recovery_info_objs))
def _test_backup_restore_into_site(self):
"""Does a backup and restores into a non-default site"""
self.assertRaises(KeyError, samdb.searchone, "unicodePwd", user_dn)
def assert_secrets(self, samdb, expect_secrets):
- """Check the user secrets in the restored DB match what's expected"""
+ """Check the secrets in the restored DB match what's expected"""
# check secrets for the built-in testenv users match what's expected
test_users = ["alice", "bob", "jane", "joe"]