]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
dsdb/tests: add test_ldap_bind_must_change_pwd()
authorStefan Metzmacher <metze@samba.org>
Thu, 2 Mar 2017 15:41:20 +0000 (16:41 +0100)
committerAndrew Bartlett <abartlet@samba.org>
Fri, 3 Mar 2017 07:59:16 +0000 (08:59 +0100)
This tests the error messages for failing LDAP Bind responses.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=9048

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
source4/dsdb/tests/python/sam.py

index f57454b7725fe5de6ba513c3640eb1a7659275d6..10775d331fe9ccfc47b7c7dc31a5dcc8d3b596f8 100755 (executable)
@@ -13,6 +13,7 @@ from samba.tests.subunitrun import SubunitOptions, TestProgram
 
 import samba.getopt as options
 
+from samba.credentials import Credentials, DONT_USE_KERBEROS
 from samba.auth import system_session
 from ldb import SCOPE_BASE, LdbError
 from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS
@@ -22,6 +23,8 @@ from ldb import ERR_OBJECT_CLASS_VIOLATION
 from ldb import ERR_CONSTRAINT_VIOLATION
 from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
 from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS
+from ldb import ERR_INVALID_CREDENTIALS
+from ldb import ERR_STRONG_AUTH_REQUIRED
 from ldb import Message, MessageElement, Dn
 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
 from samba.samdb import SamDB
@@ -47,6 +50,7 @@ from samba.dcerpc import drsuapi
 from samba.dcerpc import security
 from samba.tests import delete_force
 from samba import gensec
+from samba import werror
 
 parser = optparse.OptionParser("sam.py [options] <host>")
 sambaopts = options.SambaOptions(parser)
@@ -1626,6 +1630,179 @@ class SamTests(samba.tests.TestCase):
 
         delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
 
+    def test_ldap_bind_must_change_pwd(self):
+        """Test the error messages for failing LDAP binds"""
+        print "Test the error messages for failing LDAP binds\n"
+
+        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+
+        def format_error_msg(hresult_v, dsid_v, werror_v):
+            #
+            # There are 4 lower case hex digits following 'v' at the end,
+            # but different Windows Versions return different values:
+            #
+            # Windows 2008R2 uses 'v1db1'
+            # Windows 2012R2 uses 'v2580'
+            #
+            return "%08X: LdapErr: DSID-%08X, comment: AcceptSecurityContext error, data %x, v" % (
+                    hresult_v, dsid_v, werror_v)
+
+        HRES_SEC_E_LOGON_DENIED = 0x8009030C
+        HRES_SEC_E_INVALID_TOKEN = 0x80090308
+
+        sasl_bind_dsid = 0x0C0904DC
+        simple_bind_dsid = 0x0C0903A9
+
+        error_msg_sasl_wrong_pw = format_error_msg(
+                                HRES_SEC_E_LOGON_DENIED,
+                                sasl_bind_dsid,
+                                werror.WERR_LOGON_FAILURE)
+        error_msg_sasl_must_change = format_error_msg(
+                                HRES_SEC_E_LOGON_DENIED,
+                                sasl_bind_dsid,
+                                werror.WERR_PASSWORD_MUST_CHANGE)
+        error_msg_simple_wrong_pw = format_error_msg(
+                                HRES_SEC_E_INVALID_TOKEN,
+                                simple_bind_dsid,
+                                werror.WERR_LOGON_FAILURE)
+        error_msg_simple_must_change = format_error_msg(
+                                HRES_SEC_E_INVALID_TOKEN,
+                                simple_bind_dsid,
+                                werror.WERR_PASSWORD_MUST_CHANGE)
+
+        username = "ldaptestuser"
+        password = "thatsAcomplPASS2"
+        utf16pw = unicode('"' + password.encode('utf-8') + '"', 'utf-8').encode('utf-16-le')
+
+        ldb.add({
+            "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
+            "objectclass": "user",
+            "sAMAccountName": username,
+            "userAccountControl": str(UF_NORMAL_ACCOUNT),
+            "unicodePwd": utf16pw,
+            })
+
+        res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
+                          scope=SCOPE_BASE,
+                          attrs=["sAMAccountName", "sAMAccountType", "userAccountControl", "pwdLastSet"])
+        self.assertTrue(len(res1) == 1)
+        self.assertEqual(res1[0]["sAMAccountName"][0], username)
+        self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT)
+        self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT)
+        self.assertNotEqual(int(res1[0]["pwdLastSet"][0]), 0)
+
+        # Open a second LDB connection with the user credentials. Use the
+        # command line credentials for informations like the domain, the realm
+        # and the workstation.
+        sasl_creds = Credentials()
+        sasl_creds.set_username(username)
+        sasl_creds.set_password(password)
+        sasl_creds.set_domain(creds.get_domain())
+        sasl_creds.set_workstation(creds.get_workstation())
+        sasl_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+        sasl_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+        sasl_wrong_creds = Credentials()
+        sasl_wrong_creds.set_username(username)
+        sasl_wrong_creds.set_password("wrong")
+        sasl_wrong_creds.set_domain(creds.get_domain())
+        sasl_wrong_creds.set_workstation(creds.get_workstation())
+        sasl_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+        sasl_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+        simple_creds = Credentials()
+        simple_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn)
+        simple_creds.set_username(username)
+        simple_creds.set_password(password)
+        simple_creds.set_domain(creds.get_domain())
+        simple_creds.set_workstation(creds.get_workstation())
+        simple_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+        simple_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+        simple_wrong_creds = Credentials()
+        simple_wrong_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn)
+        simple_wrong_creds.set_username(username)
+        simple_wrong_creds.set_password("wrong")
+        simple_wrong_creds.set_domain(creds.get_domain())
+        simple_wrong_creds.set_workstation(creds.get_workstation())
+        simple_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+        simple_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+        sasl_ldb = SamDB(url=host, credentials=sasl_creds, lp=lp)
+        self.assertIsNotNone(sasl_ldb)
+        sasl_ldb = None
+
+        requires_strong_auth = False
+        try:
+            simple_ldb = SamDB(url=host, credentials=simple_creds, lp=lp)
+            self.assertIsNotNone(simple_ldb)
+            simple_ldb = None
+        except LdbError, (num, msg):
+            if num != ERR_STRONG_AUTH_REQUIRED:
+                raise
+            requires_strong_auth = True
+
+        def assertLDAPErrorMsg(msg, expected_msg):
+            self.assertTrue(expected_msg in msg,
+                            "msg[%s] does not contain expected[%s]" % (
+                            msg, expected_msg))
+
+        try:
+            ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp)
+            self.fail()
+        except LdbError, (num, msg):
+            self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+            self.assertTrue(error_msg_sasl_wrong_pw in msg)
+
+        if not requires_strong_auth:
+            try:
+                ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp)
+                self.fail()
+            except LdbError, (num, msg):
+                self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+                assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw)
+
+        m = Message()
+        m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+        m["pls1"] = MessageElement(str(0),
+                                   FLAG_MOD_REPLACE,
+                                   "pwdLastSet")
+        ldb.modify(m)
+
+        res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
+                          scope=SCOPE_BASE, attrs=["pwdLastSet"])
+        self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0)
+
+        try:
+            ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp)
+            self.fail()
+        except LdbError, (num, msg):
+            self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+            assertLDAPErrorMsg(msg, error_msg_sasl_wrong_pw)
+
+        try:
+            ldb_fail = SamDB(url=host, credentials=sasl_creds, lp=lp)
+            self.fail()
+        except LdbError, (num, msg):
+            self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+            assertLDAPErrorMsg(msg, error_msg_sasl_must_change)
+
+        if not requires_strong_auth:
+            try:
+                ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp)
+                self.fail()
+            except LdbError, (num, msg):
+                self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+                assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw)
+
+            try:
+                ldb_fail = SamDB(url=host, credentials=simple_creds, lp=lp)
+                self.fail()
+            except LdbError, (num, msg):
+                self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+                assertLDAPErrorMsg(msg, error_msg_simple_must_change)
+
+        delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
 
     def test_userAccountControl(self):
         """Test the userAccountControl behaviour"""