From: Garming Sam Date: Fri, 21 Apr 2017 03:21:58 +0000 (+1200) Subject: tests/rodc: Add password lockout tests with RODC-auth, RWDC-check X-Git-Tag: ldb-1.1.30~35 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7dfe7df6d0bb211d9a2711d532643ad0b3f7b429;p=thirdparty%2Fsamba.git tests/rodc: Add password lockout tests with RODC-auth, RWDC-check This occurs when the password is preloaded, and the bad logins and successes must be forwarded the the RWDC. The password server MUST be localdc. Signed-off-by: Garming Sam Reviewed-by: Andrew Bartlett --- diff --git a/source4/dsdb/tests/python/password_lockout_base.py b/source4/dsdb/tests/python/password_lockout_base.py index 1ea6e50cceb..992f51d7822 100644 --- a/source4/dsdb/tests/python/password_lockout_base.py +++ b/source4/dsdb/tests/python/password_lockout_base.py @@ -105,7 +105,8 @@ class BasePasswordTestCase(samba.tests.TestCase): userAccountControl=None, msDSUserAccountControlComputed=None, effective_bad_password_count=None, - msg=None): + msg=None, + badPwdCountOnly=False): print '-=' * 36 if msg is not None: print "\033[01;32m %s \033[00m\n" % msg @@ -128,17 +129,18 @@ class BasePasswordTestCase(samba.tests.TestCase): res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) self.assertTrue(len(res) == 1) self._check_attribute(res, "badPwdCount", badPwdCount) - self._check_attribute(res, "badPasswordTime", badPasswordTime) - self._check_attribute(res, "logonCount", logonCount) - self._check_attribute(res, "lastLogon", lastLogon) - self._check_attribute(res, "lastLogonTimestamp", lastLogonTimestamp) self._check_attribute(res, "lockoutTime", lockoutTime) - self._check_attribute(res, "userAccountControl", userAccountControl) - self._check_attribute(res, "msDS-User-Account-Control-Computed", - msDSUserAccountControlComputed) + self._check_attribute(res, "badPasswordTime", badPasswordTime) + if not badPwdCountOnly: + self._check_attribute(res, "logonCount", logonCount) + self._check_attribute(res, "lastLogon", lastLogon) + self._check_attribute(res, "lastLogonTimestamp", lastLogonTimestamp) + self._check_attribute(res, "userAccountControl", userAccountControl) + self._check_attribute(res, "msDS-User-Account-Control-Computed", + msDSUserAccountControlComputed) - lastLogon = int(res[0]["lastLogon"][0]) - logonCount = int(res[0]["logonCount"][0]) + lastLogon = int(res[0]["lastLogon"][0]) + logonCount = int(res[0]["logonCount"][0]) samr_user = self._open_samr_user(res) uinfo3 = self.samr.QueryUserInfo(samr_user, 3) @@ -148,16 +150,21 @@ class BasePasswordTestCase(samba.tests.TestCase): self.samr.Close(samr_user) expected_acb_info = 0 - if userAccountControl & dsdb.UF_NORMAL_ACCOUNT: - expected_acb_info |= samr.ACB_NORMAL - if userAccountControl & dsdb.UF_ACCOUNTDISABLE: - expected_acb_info |= samr.ACB_DISABLED - if userAccountControl & dsdb.UF_PASSWD_NOTREQD: - expected_acb_info |= samr.ACB_PWNOTREQ - if msDSUserAccountControlComputed & dsdb.UF_LOCKOUT: - expected_acb_info |= samr.ACB_AUTOLOCK - if msDSUserAccountControlComputed & dsdb.UF_PASSWORD_EXPIRED: - expected_acb_info |= samr.ACB_PW_EXPIRED + if not badPwdCountOnly: + if userAccountControl & dsdb.UF_NORMAL_ACCOUNT: + expected_acb_info |= samr.ACB_NORMAL + if userAccountControl & dsdb.UF_ACCOUNTDISABLE: + expected_acb_info |= samr.ACB_DISABLED + if userAccountControl & dsdb.UF_PASSWD_NOTREQD: + expected_acb_info |= samr.ACB_PWNOTREQ + if msDSUserAccountControlComputed & dsdb.UF_LOCKOUT: + expected_acb_info |= samr.ACB_AUTOLOCK + if msDSUserAccountControlComputed & dsdb.UF_PASSWORD_EXPIRED: + expected_acb_info |= samr.ACB_PW_EXPIRED + + self.assertEquals(uinfo3.acct_flags, expected_acb_info) + self.assertEquals(uinfo3.last_logon, lastLogon) + self.assertEquals(uinfo3.logon_count, logonCount) expected_bad_password_count = 0 if badPwdCount is not None: @@ -165,22 +172,21 @@ class BasePasswordTestCase(samba.tests.TestCase): if effective_bad_password_count is None: effective_bad_password_count = expected_bad_password_count - self.assertEquals(uinfo3.acct_flags, expected_acb_info) self.assertEquals(uinfo3.bad_password_count, expected_bad_password_count) - self.assertEquals(uinfo3.last_logon, lastLogon) - self.assertEquals(uinfo3.logon_count, logonCount) - self.assertEquals(uinfo5.acct_flags, expected_acb_info) - self.assertEquals(uinfo5.bad_password_count, effective_bad_password_count) - self.assertEquals(uinfo5.last_logon, lastLogon) - self.assertEquals(uinfo5.logon_count, logonCount) + if not badPwdCountOnly: + self.assertEquals(uinfo5.acct_flags, expected_acb_info) + self.assertEquals(uinfo5.bad_password_count, effective_bad_password_count) + self.assertEquals(uinfo5.last_logon, lastLogon) + self.assertEquals(uinfo5.logon_count, logonCount) + + self.assertEquals(uinfo16.acct_flags, expected_acb_info) - self.assertEquals(uinfo16.acct_flags, expected_acb_info) + self.assertEquals(uinfo21.acct_flags, expected_acb_info) + self.assertEquals(uinfo21.bad_password_count, effective_bad_password_count) + self.assertEquals(uinfo21.last_logon, lastLogon) + self.assertEquals(uinfo21.logon_count, logonCount) - self.assertEquals(uinfo21.acct_flags, expected_acb_info) - self.assertEquals(uinfo21.bad_password_count, effective_bad_password_count) - self.assertEquals(uinfo21.last_logon, lastLogon) - self.assertEquals(uinfo21.logon_count, logonCount) # check LDAP again and make sure the samr.QueryUserInfo # doesn't have any impact. diff --git a/source4/dsdb/tests/python/rodc_rwdc.py b/source4/dsdb/tests/python/rodc_rwdc.py index 85fa85df187..87d1257d97f 100644 --- a/source4/dsdb/tests/python/rodc_rwdc.py +++ b/source4/dsdb/tests/python/rodc_rwdc.py @@ -112,7 +112,473 @@ def get_server_ref_from_samdb(samdb): return res[0]['serverReference'][0] +class RodcRwdcCachedTests(password_lockout_base.BasePasswordTestCase): + counter = itertools.count(1).next + + def _check_account_initial(self, dn): + self.force_replication() + return super(RodcRwdcCachedTests, self)._check_account_initial(dn) + + def _check_account(self, dn, + badPwdCount=None, + badPasswordTime=None, + logonCount=None, + lastLogon=None, + lastLogonTimestamp=None, + lockoutTime=None, + userAccountControl=None, + msDSUserAccountControlComputed=None, + effective_bad_password_count=None, + msg=None, + badPwdCountOnly=False): + # Wait for the RWDC to get any delayed messages + # e.g. SendToSam or KRB5 bad passwords via winbindd + if (self.kerberos and isinstance(badPasswordTime, tuple) or + badPwdCount == 0): + time.sleep(5) + + return super(RodcRwdcCachedTests, + self)._check_account(dn, badPwdCount, badPasswordTime, + logonCount, lastLogon, + lastLogonTimestamp, lockoutTime, + userAccountControl, + msDSUserAccountControlComputed, + effective_bad_password_count, msg, + True) + + def force_replication(self, base=None): + if base is None: + base = self.base_dn + + # XXX feels like a horrendous way to do it. + credstring = '-U%s%%%s' % (CREDS.get_username(), + CREDS.get_password()) + cmd = ['bin/samba-tool', + 'drs', 'replicate', + RODC, RWDC, base, + credstring, + '--sync-forced'] + + p = subprocess.Popen(cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE) + stdout, stderr = p.communicate() + if p.returncode: + print "failed with code %s" % p.returncode + print ' '.join(cmd) + print "stdout" + print stdout + print "stderr" + print stderr + raise RodcRwdcTestException() + + def tearDown(self): + super(RodcRwdcCachedTests, self).tearDown() + set_auto_replication(RWDC, True) + + def setUp(self): + self.kerberos = False # To be set later + + self.rodc_db = SamDB('ldap://%s' % RODC, credentials=CREDS, + session_info=system_session(LP), lp=LP) + + self.rwdc_db = SamDB('ldap://%s' % RWDC, credentials=CREDS, + session_info=system_session(LP), lp=LP) + + # Define variables for BasePasswordTestCase + self.lp = LP + self.global_creds = CREDS + self.host = RWDC + self.host_url = 'ldap://%s' % RWDC + self.ldb = SamDB(url='ldap://%s' % RWDC, session_info=system_session(self.lp), + credentials=self.global_creds, lp=self.lp) + + super(RodcRwdcCachedTests, self).setUp() + self.host_url = 'ldap://%s' % RODC + + self.samr = samr.samr("ncacn_ip_tcp:%s[seal]" % self.host, self.lp, self.global_creds) + self.samr_handle = self.samr.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED) + self.samr_domain = self.samr.OpenDomain(self.samr_handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid) + + self.base_dn = self.rwdc_db.domain_dn() + + root = self.rodc_db.search(base='', scope=ldb.SCOPE_BASE, + attrs=['dsServiceName']) + self.service = root[0]['dsServiceName'][0] + self.tag = uuid.uuid4().hex + + self.rwdc_dsheuristics = self.rwdc_db.get_dsheuristics() + self.rwdc_db.set_dsheuristics("000000001") + + set_auto_replication(RWDC, False) + + # make sure DCs are synchronized before the test + self.force_replication() + + def test_login_lockout_krb5(self): + username = self.lockout1krb5_creds.get_username() + userpass = self.lockout1krb5_creds.get_password() + userdn = "cn=%s,cn=users,%s" % (username, self.base_dn) + + preload_rodc_user(userdn) + + self.kerberos = True + + self.rodc_dn = get_server_ref_from_samdb(self.rodc_db) + + res = self.rodc_db.search(self.rodc_dn, + scope=ldb.SCOPE_BASE, + attrs=['msDS-RevealOnDemandGroup']) + + group = res[0]['msDS-RevealOnDemandGroup'][0] + + m = ldb.Message() + m.dn = ldb.Dn(self.rwdc_db, group) + m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member') + self.rwdc_db.modify(m) + + m = ldb.Message() + m.dn = ldb.Dn(self.ldb, self.base_dn) + + self.account_lockout_duration = 10 + account_lockout_duration_ticks = -int(self.account_lockout_duration * (1e7)) + + m["lockoutDuration"] = ldb.MessageElement(str(account_lockout_duration_ticks), + ldb.FLAG_MOD_REPLACE, + "lockoutDuration") + + self.lockout_observation_window = 10 + lockout_observation_window_ticks = -int(self.lockout_observation_window * (1e7)) + + m["lockOutObservationWindow"] = ldb.MessageElement(str(lockout_observation_window_ticks), + ldb.FLAG_MOD_REPLACE, + "lockOutObservationWindow") + + self.rwdc_db.modify(m) + self.force_replication() + + self._test_login_lockout_rodc_rwdc(self.lockout1krb5_creds, userdn) + + def test_login_lockout_ntlm(self): + username = self.lockout1ntlm_creds.get_username() + userpass = self.lockout1ntlm_creds.get_password() + userdn = "cn=%s,cn=users,%s" % (username, self.base_dn) + + preload_rodc_user(userdn) + + self.kerberos = False + + self.rodc_dn = get_server_ref_from_samdb(self.rodc_db) + + res = self.rodc_db.search(self.rodc_dn, + scope=ldb.SCOPE_BASE, + attrs=['msDS-RevealOnDemandGroup']) + + group = res[0]['msDS-RevealOnDemandGroup'][0] + + m = ldb.Message() + m.dn = ldb.Dn(self.rwdc_db, group) + m['member'] = ldb.MessageElement(userdn, ldb.FLAG_MOD_ADD, 'member') + self.rwdc_db.modify(m) + + self._test_login_lockout_rodc_rwdc(self.lockout1ntlm_creds, userdn) + + def _test_login_lockout_rodc_rwdc(self, creds, userdn): + username = creds.get_username() + userpass = creds.get_password() + + # Open a second LDB connection with the user credentials. Use the + # command line credentials for informations like the domain, the realm + # and the workstation. + creds_lockout = self.insta_creds(creds) + + # The wrong password + creds_lockout.set_password("thatsAcomplPASS1x") + + self.assertLoginFailure(self.host_url, creds_lockout, self.lp) + + badPasswordTime = 0 + logonCount = 0 + lastLogon = 0 + lastLogonTimestamp=0 + logoncount_relation = '' + lastlogon_relation = '' + + res = self._check_account(userdn, + badPwdCount=1, + badPasswordTime=("greater", badPasswordTime), + logonCount=logonCount, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0, + msg='lastlogontimestamp with wrong password') + badPasswordTime = int(res[0]["badPasswordTime"][0]) + + # Correct old password + creds_lockout.set_password(userpass) + + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + + # lastLogonTimestamp should not change + # lastLogon increases if badPwdCount is non-zero (!) + res = self._check_account(userdn, + badPwdCount=0, + badPasswordTime=badPasswordTime, + logonCount=(logoncount_relation, logonCount), + lastLogon=('greater', lastLogon), + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0, + msg='LLTimestamp is updated to lastlogon') + + logonCount = int(res[0]["logonCount"][0]) + lastLogon = int(res[0]["lastLogon"][0]) + + # The wrong password + creds_lockout.set_password("thatsAcomplPASS1x") + + self.assertLoginFailure(self.host_url, creds_lockout, self.lp) + + res = self._check_account(userdn, + badPwdCount=1, + badPasswordTime=("greater", badPasswordTime), + logonCount=logonCount, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0) + badPasswordTime = int(res[0]["badPasswordTime"][0]) + + # The wrong password + creds_lockout.set_password("thatsAcomplPASS1x") + + try: + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + self.fail() + + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + + res = self._check_account(userdn, + badPwdCount=2, + badPasswordTime=("greater", badPasswordTime), + logonCount=logonCount, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0) + badPasswordTime = int(res[0]["badPasswordTime"][0]) + + print "two failed password change" + + # The wrong password + creds_lockout.set_password("thatsAcomplPASS1x") + + try: + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + self.fail() + + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + + res = self._check_account(userdn, + badPwdCount=3, + badPasswordTime=("greater", badPasswordTime), + logonCount=logonCount, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + lockoutTime=("greater", badPasswordTime), + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=dsdb.UF_LOCKOUT) + badPasswordTime = int(res[0]["badPasswordTime"][0]) + lockoutTime = int(res[0]["lockoutTime"][0]) + + # The wrong password + creds_lockout.set_password("thatsAcomplPASS1x") + try: + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + + res = self._check_account(userdn, + badPwdCount=3, + badPasswordTime=badPasswordTime, + logonCount=logonCount, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + lockoutTime=lockoutTime, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=dsdb.UF_LOCKOUT) + + # The wrong password + creds_lockout.set_password("thatsAcomplPASS1x") + try: + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + + res = self._check_account(userdn, + badPwdCount=3, + badPasswordTime=badPasswordTime, + logonCount=logonCount, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + lockoutTime=lockoutTime, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=dsdb.UF_LOCKOUT) + + # The correct password, but we are locked out + creds_lockout.set_password(userpass) + try: + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + + res = self._check_account(userdn, + badPwdCount=3, + badPasswordTime=badPasswordTime, + logonCount=logonCount, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + lockoutTime=lockoutTime, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=dsdb.UF_LOCKOUT) + + # wait for the lockout to end + time.sleep(self.account_lockout_duration + 1) + print self.account_lockout_duration + 1 + + res = self._check_account(userdn, + badPwdCount=3, effective_bad_password_count=0, + badPasswordTime=badPasswordTime, + logonCount=logonCount, + lockoutTime=lockoutTime, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0) + + # The correct password after letting the timeout expire + + creds_lockout.set_password(userpass) + + creds_lockout2 = self.insta_creds(creds_lockout) + + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout2, lp=self.lp) + time.sleep(3) + + res = self._check_account(userdn, + badPwdCount=0, + badPasswordTime=badPasswordTime, + logonCount=(logoncount_relation, logonCount), + lastLogon=(lastlogon_relation, lastLogon), + lastLogonTimestamp=lastLogonTimestamp, + lockoutTime=lockoutTime, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0, + msg="lastLogon is way off") + + # The wrong password + creds_lockout.set_password("thatsAcomplPASS1x") + try: + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + + res = self._check_account(userdn, + badPwdCount=1, + badPasswordTime=("greater", badPasswordTime), + logonCount=logonCount, + lockoutTime=lockoutTime, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0) + badPasswordTime = int(res[0]["badPasswordTime"][0]) + + # The wrong password + creds_lockout.set_password("thatsAcomplPASS1x") + try: + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + + res = self._check_account(userdn, + badPwdCount=2, + badPasswordTime=("greater", badPasswordTime), + logonCount=logonCount, + lockoutTime=lockoutTime, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0) + badPasswordTime = int(res[0]["badPasswordTime"][0]) + + time.sleep(self.lockout_observation_window + 1) + + res = self._check_account(userdn, + badPwdCount=2, effective_bad_password_count=0, + badPasswordTime=badPasswordTime, + logonCount=logonCount, + lockoutTime=lockoutTime, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0) + + # The wrong password + creds_lockout.set_password("thatsAcomplPASS1x") + try: + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + res = self._check_account(userdn, + badPwdCount=1, + badPasswordTime=("greater", badPasswordTime), + logonCount=logonCount, + lockoutTime=lockoutTime, + lastLogon=lastLogon, + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0) + badPasswordTime = int(res[0]["badPasswordTime"][0]) + + # The correct password without letting the timeout expire + creds_lockout.set_password(userpass) + ldb_lockout = SamDB(url=self.host_url, credentials=creds_lockout, lp=self.lp) + + res = self._check_account(userdn, + badPwdCount=0, + badPasswordTime=badPasswordTime, + logonCount=(logoncount_relation, logonCount), + lockoutTime=lockoutTime, + lastLogon=("greater", lastLogon), + lastLogonTimestamp=lastLogonTimestamp, + userAccountControl= + dsdb.UF_NORMAL_ACCOUNT, + msDSUserAccountControlComputed=0) class RodcRwdcTests(password_lockout_base.BasePasswordTestCase): counter = itertools.count(1).next