sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
-from typing import Callable, Iterable, NewType, Optional, Tuple, TypeVar
+from typing import Callable, Iterable, NewType, Optional, Set, Tuple, TypeVar
import datetime
from itertools import chain
)
from samba.dcerpc import gkdi, gmsa, misc, netlogon, security, srvsvc
from samba.ndr import ndr_pack, ndr_unpack
+from samba.net import Net
from samba.nt_time import (
nt_time_delta_from_timedelta,
nt_time_from_datetime,
)
from samba.tests import connect_samdb
+from samba.tests.dckeytab import keytab_as_set
from samba.tests.krb5 import kcrypto
from samba.tests.gkdi import GkdiBaseTest, ROOT_KEY_START_TIME
from samba.tests.krb5.kdc_base_test import KDCBaseTest
# Expect the gensec logon to fail.
self.gensec_ntlmssp_logon(creds, samdb, expect_success=False)
+ def test_gmsa_keys_when_previous_password_is_not_acceptable(self):
+ self._check_gmsa_keys(within_valid_window=False, expect_previous_keys=False)
+
+ def test_gmsa_keys_when_previous_password_is_acceptable(self):
+ self._check_gmsa_keys(within_valid_window=True, expect_previous_keys=True)
+
+ def _check_gmsa_keys(
+ self, *, within_valid_window: bool, expect_previous_keys: bool
+ ):
+ password_interval = 77
+
+ samdb = self.get_local_samdb()
+ series = self.gmsa_series(password_interval)
+ self.set_db_time(samdb, series.start_of_interval(0))
+
+ creds = self.gmsa_account(samdb=samdb, interval=password_interval)
+
+ if within_valid_window:
+ db_time = series.within_previous_password_valid_window(1)
+ else:
+ db_time = series.outside_previous_password_valid_window(1)
+ self.set_db_time(samdb, db_time)
+
+ gmsa_principal = f"{creds.get_username()}@{creds.get_realm()}"
+
+ ktfile = os.path.join(self.tempdir, "test.keytab")
+ self.addCleanup(self.rm_files, ktfile)
+
+ net = Net(None, self.get_lp())
+ net.export_keytab(
+ keytab=ktfile,
+ samdb=samdb,
+ principal=gmsa_principal,
+ only_current_keys=True,
+ as_for_AS_REQ=True,
+ )
+ self.assertTrue(os.path.exists(ktfile), "keytab was not created")
+
+ with open(ktfile, "rb") as bytes_kt:
+ keytab_bytes = bytes_kt.read()
+
+ keytab_set = keytab_as_set(keytab_bytes)
+ exported_etypes = {entry[1] for entry in keytab_set}
+
+ # Ensure that the AES keys were exported.
+ self.assertLessEqual(
+ {kcrypto.Enctype.AES256, kcrypto.Enctype.AES128}, exported_etypes
+ )
+
+ def fill_keytab(
+ creds: KerberosCredentials,
+ keytab: Set[Tuple[str, kcrypto.Enctype, int, bytes]],
+ etypes: Iterable[kcrypto.Enctype],
+ ) -> None:
+ for etype in etypes:
+ key = self.TicketDecryptionKey_from_creds(creds, etype=etype)
+ kvno = 2
+ entry = gmsa_principal, etype, kvno, key.key.contents
+
+ self.assertNotIn(entry, keytab, "key already present in keytab")
+ keytab.add(entry)
+
+ expected_keytab = set()
+
+ if expect_previous_keys:
+ # Fill the expected keytab with the previous keys.
+ fill_keytab(creds, expected_keytab, exported_etypes)
+
+ # Calculate the new password.
+ managed_pwd = self.expected_gmsa_password_blob(
+ samdb,
+ creds,
+ series.interval_gkid(1),
+ previous_gkid=series.interval_gkid(0),
+ query_expiration_gkid=series.interval_gkid(2),
+ )
+
+ # Set the new password.
+ self.assertIsNotNone(
+ managed_pwd.passwords.current, "current password must be present"
+ )
+ creds.set_utf16_password(managed_pwd.passwords.current)
+
+ # Clear the initial set of keys associated with this credentials object.
+ creds.clear_forced_keys()
+ # Add the current keys to the expected keytab.
+ fill_keytab(creds, expected_keytab, exported_etypes)
+
+ # Ensure the keytab is as we expect.
+ self.assertEqual(expected_keytab, keytab_set)
+
def test_gmsa_can_perform_netlogon(self):
self._test_samlogon(
self.gmsa_account(kerberos_enabled=False),