from samba.tests.krb5.as_req_tests import AsReqBaseTest
from samba.tests.krb5 import kcrypto
-from samba.tests.krb5.kdc_base_test import KDCBaseTest
+from samba.tests.krb5.kdc_tgs_tests import KdcTgsBaseTests
from samba.tests.krb5.raw_testcase import KerberosCredentials
import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
from samba.tests.krb5.rfc4120_constants import (
KDC_ERR_CLIENT_REVOKED,
+ KDC_ERR_KEY_EXPIRED,
KDC_ERR_PREAUTH_FAILED,
KRB_AS_REP,
KRB_ERROR,
return ConnectionResult.SUCCESS
-class LockoutTests(KDCBaseTest):
+class LockoutTests(KdcTgsBaseTests):
def setUp(self):
super().setUp()
def test_lockout_transaction_kdc_ntstatus(self):
self.do_lockout_transaction(partial(connect_kdc, expect_status=True))
+ # Test that performing AS‐REQs with accounts in various states of
+ # unusability results in appropriate NTSTATUS and Kerberos error codes.
+
+ def test_lockout_status_disabled(self):
+ self._run_lockout_status(
+ self._get_creds_disabled(),
+ expected_status=ntstatus.NT_STATUS_ACCOUNT_DISABLED,
+ expected_error=KDC_ERR_CLIENT_REVOKED,
+ )
+
+ def test_lockout_status_locked_out(self):
+ self._run_lockout_status(
+ self._get_creds_locked_out(),
+ expected_status=ntstatus.NT_STATUS_ACCOUNT_LOCKED_OUT,
+ expected_error=KDC_ERR_CLIENT_REVOKED,
+ )
+
+ def test_lockout_status_expired(self):
+ self._run_lockout_status(
+ self._get_creds_expired(),
+ expected_status=ntstatus.NT_STATUS_ACCOUNT_EXPIRED,
+ expected_error=KDC_ERR_CLIENT_REVOKED,
+ )
+
+ def test_lockout_status_must_change(self):
+ self._run_lockout_status(
+ self._get_creds_must_change(),
+ expected_status=ntstatus.NT_STATUS_PASSWORD_MUST_CHANGE,
+ expected_error=KDC_ERR_KEY_EXPIRED,
+ )
+
+ def test_lockout_status_password_expired(self):
+ self._run_lockout_status(
+ self._get_creds_password_expired(),
+ expected_status=ntstatus.NT_STATUS_PASSWORD_EXPIRED,
+ expected_error=KDC_ERR_KEY_EXPIRED,
+ )
+
+ # Test that performing the same AS‐REQs, this time with FAST, does not
+ # result in NTSTATUS codes.
+
+ def test_lockout_status_disabled_fast(self):
+ self._run_lockout_status_fast(
+ self._get_creds_disabled(), expected_error=KDC_ERR_CLIENT_REVOKED
+ )
+
+ def test_lockout_status_locked_out_fast(self):
+ self._run_lockout_status_fast(
+ self._get_creds_locked_out(), expected_error=KDC_ERR_CLIENT_REVOKED
+ )
+
+ def test_lockout_status_expired_fast(self):
+ self._run_lockout_status_fast(
+ self._get_creds_expired(), expected_error=KDC_ERR_CLIENT_REVOKED
+ )
+
+ def test_lockout_status_must_change_fast(self):
+ self._run_lockout_status_fast(
+ self._get_creds_must_change(), expected_error=KDC_ERR_KEY_EXPIRED
+ )
+
+ def test_lockout_status_password_expired_fast(self):
+ self._run_lockout_status_fast(
+ self._get_creds_password_expired(), expected_error=KDC_ERR_KEY_EXPIRED
+ )
+
+ def _get_creds_disabled(self):
+ return self.get_cached_creds(
+ account_type=self.AccountType.USER, opts={"enabled": False}
+ )
+
+ def _get_creds_locked_out(self) -> KerberosCredentials:
+ samdb = self.get_samdb()
+
+ user_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER, use_cache=False
+ )
+ user_dn = user_creds.get_dn()
+
+ # Lock out the account.
+
+ old_utf16pw = '"Secret007"'.encode("utf-16le") # invalid pwd
+ new_utf16pw = '"Secret008"'.encode("utf-16le")
+
+ msg = ldb.Message(user_dn)
+ msg["0"] = ldb.MessageElement(old_utf16pw, ldb.FLAG_MOD_DELETE, "unicodePwd")
+ msg["1"] = ldb.MessageElement(new_utf16pw, ldb.FLAG_MOD_ADD, "unicodePwd")
+
+ for _ in range(self.lockout_threshold):
+ try:
+ samdb.modify(msg)
+ except ldb.LdbError as err:
+ num, _ = err.args
+
+ # We get an error, but the bad password count should
+ # still be updated.
+ self.assertEqual(num, ldb.ERR_CONSTRAINT_VIOLATION)
+ else:
+ self.fail("pwd change should have failed")
+
+ # Ensure the account is locked out.
+
+ res = samdb.search(
+ user_dn, scope=ldb.SCOPE_BASE, attrs=["msDS-User-Account-Control-Computed"]
+ )
+ self.assertEqual(1, len(res))
+
+ uac = int(res[0].get("msDS-User-Account-Control-Computed", idx=0))
+ self.assertTrue(uac & dsdb.UF_LOCKOUT)
+
+ return user_creds
+
+ def _get_creds_expired(self) -> KerberosCredentials:
+ return self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ opts={"additional_details": self.freeze({"accountExpires": "1"})},
+ )
+
+ def _get_creds_must_change(self) -> KerberosCredentials:
+ return self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ opts={"additional_details": self.freeze({"pwdLastSet": "0"})},
+ )
+
+ def _get_creds_password_expired(self) -> KerberosCredentials:
+ samdb = self.get_samdb()
+ self.addCleanup(samdb.set_maxPwdAge, samdb.get_maxPwdAge())
+ low_pwd_age = -2
+ samdb.set_maxPwdAge(low_pwd_age)
+
+ return self.get_cached_creds(account_type=self.AccountType.USER)
+
+ def _run_lockout_status(
+ self,
+ user_creds: KerberosCredentials,
+ *,
+ expected_status: int,
+ expected_error: int,
+ ) -> None:
+ user_name = user_creds.get_username()
+ cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL, names=user_name.split("/")
+ )
+
+ krbtgt_creds = self.get_krbtgt_creds()
+ realm = krbtgt_creds.get_realm()
+
+ sname = self.get_krbtgt_sname()
+
+ preauth_key = self.PasswordKey_from_creds(user_creds, kcrypto.Enctype.AES256)
+
+ ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(preauth_key)
+ padata = [ts_enc_padata]
+
+ def _generate_padata_copy(_kdc_exchange_dict, _callback_dict, req_body):
+ return padata, req_body
+
+ kdc_exchange_dict = self.as_exchange_dict(
+ creds=user_creds,
+ expected_crealm=realm,
+ expected_cname=cname,
+ expected_srealm=realm,
+ expected_sname=sname,
+ expected_account_name=user_name,
+ expected_supported_etypes=krbtgt_creds.tgs_supported_enctypes,
+ expect_edata=True,
+ expect_status=True,
+ expected_status=expected_status,
+ ticket_decryption_key=self.TicketDecryptionKey_from_creds(krbtgt_creds),
+ generate_padata_fn=_generate_padata_copy,
+ check_error_fn=self.generic_check_kdc_error,
+ check_rep_fn=None,
+ check_kdc_private_fn=self.generic_check_kdc_private,
+ expected_error_mode=expected_error,
+ expected_salt=user_creds.get_salt(),
+ preauth_key=preauth_key,
+ kdc_options=str(krb5_asn1.KDCOptions("postdated")),
+ pac_request=True,
+ )
+
+ # Try making a Kerberos AS-REQ to the KDC. This might fail, either due
+ # to the user's account being locked out or due to using the wrong
+ # password.
+ self._generic_kdc_exchange(
+ kdc_exchange_dict,
+ cname=cname,
+ realm=realm,
+ sname=sname,
+ till_time=self.get_KerberosTime(offset=36000),
+ etypes=self.get_default_enctypes(user_creds),
+ )
+
+ def _run_lockout_status_fast(
+ self, user_creds: KerberosCredentials, *, expected_error: int
+ ) -> None:
+ self._armored_as_req(
+ user_creds,
+ self.get_krbtgt_creds(),
+ self.get_tgt(self.get_mach_creds()),
+ expected_error=expected_error,
+ expect_edata=self.expect_padata_outer,
+ # FAST‐armored responses never contain an NTSTATUS code.
+ expect_status=False,
+ )
+
def test_lockout_transaction_ntlm(self):
self.do_lockout_transaction(connect_ntlm)