sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
-from typing import Iterable, NewType, Optional, Tuple, TypeVar
+from typing import Callable, Iterable, NewType, Optional, Tuple, TypeVar
import datetime
from itertools import chain
import ldb
-from samba import auth, dsdb, gensec, ntstatus, NTSTATUSError, werror
-from samba.dcerpc import gkdi, gmsa, misc, netlogon, security
+from samba import (
+ auth,
+ dsdb,
+ generate_random_password,
+ gensec,
+ ntstatus,
+ NTSTATUSError,
+ werror,
+)
+from samba.dcerpc import gkdi, gmsa, misc, netlogon, security, srvsvc
from samba.ndr import ndr_pack, ndr_unpack
from samba.nt_time import (
nt_time_delta_from_timedelta,
def test_gmsa_can_authenticate_to_ldap_without_kerberos(self):
self._gmsa_can_authenticate_to_ldap(with_kerberos=False)
+ def test_gmsa_can_perform_ServerAuthenticate3(self):
+ creds = self.gmsa_account(kerberos_enabled=False)
+ username = creds.get_username()
+ dc_server = self.get_samdb().host_dns_name()
+
+ # Use the gMSA’s credentials to create a netlogon connection. This call,
+ # which internally performs a ServerAuthenticate3, is more than just
+ # setup: it is the centrepiece of the test.
+ c = netlogon.netlogon(
+ f"ncacn_ip_tcp:{dc_server}[schannel,seal]", self.get_lp(), creds
+ )
+ credential = netlogon.netr_Credential()
+ credential.data = list(b"abcdefgh")
+ server_credential = c.netr_ServerReqChallenge(None, username, credential)
+ with self.assertRaises(NTSTATUSError) as err:
+ # Try performing a ServerAuthenticate3 with our gMSA account. The
+ # procedure for calculating a correct challenge is too complicated
+ # to try to reimplement in Python, so we won’t even try. But the
+ # fact that we get an ACCESS_DENIED error, rather than something
+ # like NO_TRUST_SAM_ACCOUNT, shows that gMSAs are not prevented from
+ # using ServerAuthenticate3 to authenticate.
+ c.netr_ServerAuthenticate3(
+ dc_server,
+ username,
+ misc.SEC_CHAN_WKSTA,
+ username,
+ server_credential,
+ netlogon.NETLOGON_NEG_STRONG_KEYS | netlogon.NETLOGON_NEG_SUPPORTS_AES,
+ )
+
+ self.assertEqual(ntstatus.NT_STATUS_ACCESS_DENIED, err.exception.args[0])
+
+ def test_gmsa_cannot_be_locked_out_with_gensec_ntlmssp(self):
+ def try_bad_creds(creds: Credentials, samdb: SamDB) -> None:
+ self.gensec_ntlmssp_logon(creds, samdb, expect_success=False)
+
+ self._check_gmsa_cannot_be_locked_out(
+ try_bad_creds_fn=try_bad_creds, kerberos_enabled=False, local=True
+ )
+
+ def test_gmsa_cannot_be_locked_out_with_ldap_authentication(self):
+ def try_bad_creds(creds: Credentials, _samdb: SamDB) -> None:
+ with self.assertRaises(ldb.LdbError) as err:
+ SamDB(url=f"ldap://{self.dc_host}", credentials=creds, lp=self.get_lp())
+
+ num, estr = err.exception.args
+
+ self.assertEqual(ldb.ERR_INVALID_CREDENTIALS, num)
+ self.assertIn("NT_STATUS_LOGON_FAILURE", estr)
+
+ self._check_gmsa_cannot_be_locked_out(try_bad_creds_fn=try_bad_creds)
+
+ def _check_gmsa_cannot_be_locked_out(
+ self,
+ *,
+ try_bad_creds_fn: Callable[[Credentials, SamDB], None],
+ kerberos_enabled: bool = True,
+ local: bool = False,
+ ):
+ samdb = self.get_local_samdb() if local else self.get_samdb()
+ base_dn = ldb.Dn(samdb, samdb.domain_dn())
+
+ def modify_attr(attr, value):
+ if value is None:
+ value = []
+ flag = ldb.FLAG_MOD_DELETE
+ else:
+ value = str(value)
+ flag = ldb.FLAG_MOD_REPLACE
+
+ msg = ldb.Message(base_dn)
+ msg[attr] = ldb.MessageElement(value, flag, attr)
+ samdb.modify(msg)
+
+ res = samdb.search(base_dn, scope=ldb.SCOPE_BASE, attrs=["lockoutThreshold"])
+ self.assertEqual(1, len(res))
+
+ # Reset the lockout threshold as it was before.
+ lockout_threshold = res[0].get("lockoutThreshold", idx=0)
+ self.addCleanup(modify_attr, "lockoutThreshold", lockout_threshold)
+
+ # Set the new lockout threshold.
+ lockout_threshold = 3
+ modify_attr("lockoutThreshold", lockout_threshold)
+
+ creds = self.gmsa_account(kerberos_enabled=kerberos_enabled)
+ dn = ldb.Dn(samdb, str(creds.get_dn()))
+
+ # Truncate the password to ensure that it is invalid.
+ creds.set_password(creds.get_password()[:-1])
+
+ prev_bad_pwd_time = 0
+
+ for i in range(lockout_threshold + 1):
+ try_bad_creds_fn(creds, samdb)
+
+ # Ensure the account is not locked out.
+
+ res = samdb.search(
+ dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=[
+ "badPasswordTime",
+ "badPwdCount",
+ "lockoutTime",
+ "msDS-User-Account-Control-Computed",
+ ],
+ )
+ self.assertEqual(1, len(res))
+
+ # Despite the bad password count having increased, …
+ bad_pwd_count = int(res[0].get("badPwdCount", idx=0))
+ self.assertEqual(i + 1, bad_pwd_count)
+
+ # …the account should not be locked out.
+ uac = int(res[0].get("msDS-User-Account-Control-Computed", idx=0))
+ self.assertFalse(uac & dsdb.UF_LOCKOUT)
+
+ # The bad password time should have increased.
+ bad_pwd_time = int(res[0].get("badPasswordTime", idx=0))
+ self.assertGreater(bad_pwd_time, prev_bad_pwd_time)
+
+ prev_bad_pwd_time = bad_pwd_time
+
+ # The lockout time should not be set.
+ lockout_time = res[0].get("lockoutTime", idx=0)
+ self.assertIsNone(lockout_time)
+
+ def _server_set_password(self, creds: Credentials, password: str) -> None:
+ dc_server = self.get_samdb().host_dns_name()
+ lp = self.get_lp()
+
+ conn = netlogon.netlogon(f"ncacn_ip_tcp:{dc_server}[schannel,seal]", lp, creds)
+
+ auth = creds.new_client_authenticator()
+ authenticator = netlogon.netr_Authenticator()
+ authenticator.cred.data = list(auth["credential"])
+ authenticator.timestamp = auth["timestamp"]
+
+ DATA_LEN = 512
+
+ encoded = password.encode("utf-16-le")
+ pwd_len = len(encoded)
+ filler = os.urandom(DATA_LEN - pwd_len)
+
+ pwd = netlogon.netr_CryptPassword()
+ pwd.length = pwd_len
+ pwd.data = list(filler + encoded)
+ creds.encrypt_netr_crypt_password(pwd)
+
+ conn.netr_ServerPasswordSet2(
+ dc_server,
+ creds.get_username(),
+ misc.SEC_CHAN_WKSTA,
+ creds.get_workstation(),
+ authenticator,
+ pwd,
+ )
+
+ def test_gmsa_can_authenticate_with_previous_password_and_ntlm(self):
+ creds = self.gmsa_account(kerberos_enabled=False)
+ dc_server = self.get_samdb().host_dns_name()
+ lp = self.get_lp()
+
+ # We can use NTLM to authenticate.
+ srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
+
+ PWD_LEN = 32
+
+ # Change the password using ServerPasswordSet2. Windows does not prevent
+ # this, despite the whole point of Group Managed Service Accounts being
+ # that the password is managed by AD, and despite changing passwords
+ # outside of that system not making much sense.
+ password_1 = generate_random_password(PWD_LEN, PWD_LEN)
+ self._server_set_password(creds, password_1)
+
+ # As less than five minutes have passed, we can still authenticate with
+ # our original password.
+ srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
+
+ # Change the password again.
+ password_2 = generate_random_password(PWD_LEN, PWD_LEN)
+ self._server_set_password(creds, password_2)
+
+ # This time, NTLM authentication fails!
+ with self.assertRaises(NTSTATUSError) as err:
+ srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
+
+ self.assertEqual(ntstatus.NT_STATUS_LOGON_FAILURE, err.exception.args[0])
+
+ # But we can use the previous password to authenticate.
+ creds.set_password(password_1)
+ srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
+
+ # And we can authenticate using the current password.
+ creds.set_password(password_2)
+ srvsvc.srvsvc(f"ncacn_np:{dc_server}", lp, creds)
+
if __name__ == "__main__":
import unittest