import samba.getopt as options
+from samba.credentials import Credentials, DONT_USE_KERBEROS
from samba.auth import system_session
from ldb import SCOPE_BASE, LdbError
from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS
from ldb import ERR_CONSTRAINT_VIOLATION
from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS
+from ldb import ERR_INVALID_CREDENTIALS
+from ldb import ERR_STRONG_AUTH_REQUIRED
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
from samba.samdb import SamDB
from samba.dcerpc import security
from samba.tests import delete_force
from samba import gensec
+from samba import werror
parser = optparse.OptionParser("sam.py [options] <host>")
sambaopts = options.SambaOptions(parser)
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+ def test_ldap_bind_must_change_pwd(self):
+ """Test the error messages for failing LDAP binds"""
+ print "Test the error messages for failing LDAP binds\n"
+
+ delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+
+ def format_error_msg(hresult_v, dsid_v, werror_v):
+ #
+ # There are 4 lower case hex digits following 'v' at the end,
+ # but different Windows Versions return different values:
+ #
+ # Windows 2008R2 uses 'v1db1'
+ # Windows 2012R2 uses 'v2580'
+ #
+ return "%08X: LdapErr: DSID-%08X, comment: AcceptSecurityContext error, data %x, v" % (
+ hresult_v, dsid_v, werror_v)
+
+ HRES_SEC_E_LOGON_DENIED = 0x8009030C
+ HRES_SEC_E_INVALID_TOKEN = 0x80090308
+
+ sasl_bind_dsid = 0x0C0904DC
+ simple_bind_dsid = 0x0C0903A9
+
+ error_msg_sasl_wrong_pw = format_error_msg(
+ HRES_SEC_E_LOGON_DENIED,
+ sasl_bind_dsid,
+ werror.WERR_LOGON_FAILURE)
+ error_msg_sasl_must_change = format_error_msg(
+ HRES_SEC_E_LOGON_DENIED,
+ sasl_bind_dsid,
+ werror.WERR_PASSWORD_MUST_CHANGE)
+ error_msg_simple_wrong_pw = format_error_msg(
+ HRES_SEC_E_INVALID_TOKEN,
+ simple_bind_dsid,
+ werror.WERR_LOGON_FAILURE)
+ error_msg_simple_must_change = format_error_msg(
+ HRES_SEC_E_INVALID_TOKEN,
+ simple_bind_dsid,
+ werror.WERR_PASSWORD_MUST_CHANGE)
+
+ username = "ldaptestuser"
+ password = "thatsAcomplPASS2"
+ utf16pw = unicode('"' + password.encode('utf-8') + '"', 'utf-8').encode('utf-16-le')
+
+ ldb.add({
+ "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
+ "objectclass": "user",
+ "sAMAccountName": username,
+ "userAccountControl": str(UF_NORMAL_ACCOUNT),
+ "unicodePwd": utf16pw,
+ })
+
+ res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["sAMAccountName", "sAMAccountType", "userAccountControl", "pwdLastSet"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEqual(res1[0]["sAMAccountName"][0], username)
+ self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT)
+ self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT)
+ self.assertNotEqual(int(res1[0]["pwdLastSet"][0]), 0)
+
+ # Open a second LDB connection with the user credentials. Use the
+ # command line credentials for informations like the domain, the realm
+ # and the workstation.
+ sasl_creds = Credentials()
+ sasl_creds.set_username(username)
+ sasl_creds.set_password(password)
+ sasl_creds.set_domain(creds.get_domain())
+ sasl_creds.set_workstation(creds.get_workstation())
+ sasl_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+ sasl_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ sasl_wrong_creds = Credentials()
+ sasl_wrong_creds.set_username(username)
+ sasl_wrong_creds.set_password("wrong")
+ sasl_wrong_creds.set_domain(creds.get_domain())
+ sasl_wrong_creds.set_workstation(creds.get_workstation())
+ sasl_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+ sasl_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ simple_creds = Credentials()
+ simple_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn)
+ simple_creds.set_username(username)
+ simple_creds.set_password(password)
+ simple_creds.set_domain(creds.get_domain())
+ simple_creds.set_workstation(creds.get_workstation())
+ simple_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+ simple_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ simple_wrong_creds = Credentials()
+ simple_wrong_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn)
+ simple_wrong_creds.set_username(username)
+ simple_wrong_creds.set_password("wrong")
+ simple_wrong_creds.set_domain(creds.get_domain())
+ simple_wrong_creds.set_workstation(creds.get_workstation())
+ simple_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+ simple_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ sasl_ldb = SamDB(url=host, credentials=sasl_creds, lp=lp)
+ self.assertIsNotNone(sasl_ldb)
+ sasl_ldb = None
+
+ requires_strong_auth = False
+ try:
+ simple_ldb = SamDB(url=host, credentials=simple_creds, lp=lp)
+ self.assertIsNotNone(simple_ldb)
+ simple_ldb = None
+ except LdbError, (num, msg):
+ if num != ERR_STRONG_AUTH_REQUIRED:
+ raise
+ requires_strong_auth = True
+
+ def assertLDAPErrorMsg(msg, expected_msg):
+ self.assertTrue(expected_msg in msg,
+ "msg[%s] does not contain expected[%s]" % (
+ msg, expected_msg))
+
+ try:
+ ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ self.assertTrue(error_msg_sasl_wrong_pw in msg)
+
+ if not requires_strong_auth:
+ try:
+ ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw)
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+ m["pls1"] = MessageElement(str(0),
+ FLAG_MOD_REPLACE,
+ "pwdLastSet")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
+ scope=SCOPE_BASE, attrs=["pwdLastSet"])
+ self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0)
+
+ try:
+ ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_sasl_wrong_pw)
+
+ try:
+ ldb_fail = SamDB(url=host, credentials=sasl_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_sasl_must_change)
+
+ if not requires_strong_auth:
+ try:
+ ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw)
+
+ try:
+ ldb_fail = SamDB(url=host, credentials=simple_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_simple_must_change)
+
+ delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
def test_userAccountControl(self):
"""Test the userAccountControl behaviour"""