import os
from ldb import SCOPE_SUBTREE
-from samba import gensec
+from samba import NTSTATUSError, gensec
from samba.auth import AuthContext
from samba.dcerpc import security
from samba.ndr import ndr_unpack
+from samba.ntstatus import NT_STATUS_ACCESS_DENIED
from samba.tests.krb5.kdc_base_test import KDCBaseTest
"""
def test_ccache(self):
+ self._run_ccache_test("ccacheusr")
+
+ def test_ccache_no_pac(self):
+ self._run_ccache_test("ccacheusr_nopac", include_pac=False,
+ expect_anon=True, allow_error=True)
+
+ def _run_ccache_test(self, user_name, include_pac=True,
+ expect_anon=False, allow_error=False):
# Create a user account and a machine account, along with a Kerberos
# credentials cache file where the service ticket authenticating the
# user are stored.
- user_name = "ccacheusr"
mach_name = "ccachemac"
service = "host"
# ticket, to ensure that the krbtgt ticket doesn't also need to be
# stored.
(creds, cachefile) = self.create_ccache_with_user(user_credentials,
- mach_credentials)
+ mach_credentials,
+ pac=include_pac)
+ # Remove the cached credentials file.
+ self.addCleanup(os.remove, cachefile.name)
# Authenticate in-process to the machine account using the user's
# cached credentials.
sid = ndr_unpack(security.dom_sid, ldb_res[0]["objectSid"][0])
# Retrieve the SIDs from the security token.
- session = gensec_server.session_info()
+ try:
+ session = gensec_server.session_info()
+ except NTSTATUSError as e:
+ if not allow_error:
+ self.fail()
+
+ enum, _ = e.args
+ self.assertEqual(NT_STATUS_ACCESS_DENIED, enum)
+ return
+
token = session.security_token
token_sids = token.sids
self.assertGreater(len(token_sids), 0)
# Ensure that they match.
self.assertEqual(sid, token_sids[0])
- # Remove the cached credentials file.
- os.remove(cachefile.name)
-
if __name__ == "__main__":
global_asn1_print = False
import sys
import os
-from ldb import SCOPE_BASE, SCOPE_SUBTREE
+from ldb import LdbError, ERR_OPERATIONS_ERROR, SCOPE_BASE, SCOPE_SUBTREE
from samba.dcerpc import security
from samba.ndr import ndr_unpack
from samba.samdb import SamDB
+from samba import credentials
from samba.tests.krb5.kdc_base_test import KDCBaseTest
"""
def test_ldap(self):
+ self._run_ldap_test("ldapusr")
+
+ def test_ldap_no_pac(self):
+ self._run_ldap_test("ldapusr_nopac", include_pac=False,
+ expect_anon=True, allow_error=True)
+
+ def _run_ldap_test(self, user_name, include_pac=True,
+ expect_anon=False, allow_error=False):
# Create a user account and a machine account, along with a Kerberos
# credentials cache file where the service ticket authenticating the
# user are stored.
samdb = self.get_samdb()
- user_name = "ldapusr"
mach_name = samdb.host_dns_name()
service = "ldap"
(creds, cachefile) = self.create_ccache_with_user(user_credentials,
mach_credentials,
service,
- mach_name)
+ mach_name,
+ pac=include_pac)
+ # Remove the cached credentials file.
+ self.addCleanup(os.remove, cachefile.name)
# Authenticate in-process to the machine account using the user's
# cached credentials.
self.assertEqual(1, len(ldb_res))
sid = ndr_unpack(security.dom_sid, ldb_res[0]["objectSid"][0])
+ # Connect to the machine account and retrieve the user SID.
+ try:
+ ldb_as_user = SamDB(url="ldap://%s" % mach_name,
+ credentials=creds,
+ lp=self.get_lp())
+ except LdbError as e:
+ if not allow_error:
+ self.fail()
+
+ enum, estr = e.args
+ self.assertEqual(ERR_OPERATIONS_ERROR, enum)
+ self.assertIn('NT_STATUS_ACCESS_DENIED', estr)
+ return
+
+ ldb_res = ldb_as_user.search('',
+ scope=SCOPE_BASE,
+ attrs=["tokenGroups"])
+ self.assertEqual(1, len(ldb_res))
+
+ token_groups = ldb_res[0]["tokenGroups"]
+ token_sid = ndr_unpack(security.dom_sid, token_groups[0])
+
+ if expect_anon:
+ # Ensure we got an anonymous token.
+ self.assertEqual(security.SID_NT_ANONYMOUS, str(token_sid))
+ token_sid = ndr_unpack(security.dom_sid, token_groups[1])
+ self.assertEqual(security.SID_NT_NETWORK, str(token_sid))
+ if len(token_groups) >= 3:
+ token_sid = ndr_unpack(security.dom_sid, token_groups[2])
+ self.assertEqual(security.SID_NT_THIS_ORGANISATION,
+ str(token_sid))
+ else:
+ # Ensure that they match.
+ self.assertEqual(sid, token_sid)
+
+ def test_ldap_anonymous(self):
+ samdb = self.get_samdb()
+ mach_name = samdb.host_dns_name()
+
+ anon_creds = credentials.Credentials()
+ anon_creds.set_anonymous()
+
# Connect to the machine account and retrieve the user SID.
ldb_as_user = SamDB(url="ldap://%s" % mach_name,
- credentials=creds,
+ credentials=anon_creds,
lp=self.get_lp())
ldb_res = ldb_as_user.search('',
scope=SCOPE_BASE,
attrs=["tokenGroups"])
self.assertEqual(1, len(ldb_res))
+ # Ensure we got an anonymous token.
token_sid = ndr_unpack(security.dom_sid, ldb_res[0]["tokenGroups"][0])
-
- # Ensure that they match.
- self.assertEqual(sid, token_sid)
-
- # Remove the cached credentials file.
- os.remove(cachefile.name)
+ self.assertEqual(security.SID_NT_ANONYMOUS, str(token_sid))
+ self.assertEqual(len(ldb_res[0]["tokenGroups"]), 1)
if __name__ == "__main__":
import sys
import os
+from samba import NTSTATUSError, credentials
from samba.dcerpc import lsa
+from samba.ntstatus import NT_STATUS_ACCESS_DENIED
from samba.tests.krb5.kdc_base_test import KDCBaseTest
"""
def test_rpc(self):
+ self._run_rpc_test("rpcusr")
+
+ def test_rpc_no_pac(self):
+ self._run_rpc_test("rpcusr_nopac", include_pac=False,
+ expect_anon=True, allow_error=True)
+
+ def _run_rpc_test(self, user_name, include_pac=True,
+ expect_anon=False, allow_error=False):
# Create a user account and a machine account, along with a Kerberos
# credentials cache file where the service ticket authenticating the
# user are stored.
samdb = self.get_samdb()
- user_name = "rpcusr"
mach_name = samdb.host_dns_name()
service = "cifs"
(creds, cachefile) = self.create_ccache_with_user(user_credentials,
mach_credentials,
service,
- mach_name)
+ mach_name,
+ pac=include_pac)
+ # Remove the cached credentials file.
+ self.addCleanup(os.remove, cachefile.name)
# Authenticate in-process to the machine account using the user's
# cached credentials.
binding_str = "ncacn_np:%s[\\pipe\\lsarpc]" % mach_name
- conn = lsa.lsarpc(binding_str, self.get_lp(), creds)
+ try:
+ conn = lsa.lsarpc(binding_str, self.get_lp(), creds)
+ except NTSTATUSError as e:
+ if not allow_error:
+ self.fail()
+
+ enum, _ = e.args
+ self.assertEqual(NT_STATUS_ACCESS_DENIED, enum)
+ return
(account_name, _) = conn.GetUserName(None, None, None)
- self.assertEqual(user_name, account_name.string)
+ if expect_anon:
+ self.assertNotEqual(user_name, account_name.string)
+ else:
+ self.assertEqual(user_name, account_name.string)
- # Remove the cached credentials file.
- os.remove(cachefile.name)
+ def test_rpc_anonymous(self):
+ samdb = self.get_samdb()
+ mach_name = samdb.host_dns_name()
+
+ anon_creds = credentials.Credentials()
+ anon_creds.set_anonymous()
+
+ binding_str = "ncacn_np:%s[\\pipe\\lsarpc]" % mach_name
+ conn = lsa.lsarpc(binding_str, self.get_lp(), anon_creds)
+
+ (account_name, _) = conn.GetUserName(None, None, None)
+
+ self.assertEqual('ANONYMOUS LOGON', account_name.string)
if __name__ == "__main__":
import os
from ldb import SCOPE_SUBTREE
+from samba import NTSTATUSError
from samba.dcerpc import security
from samba.ndr import ndr_unpack
+from samba.ntstatus import NT_STATUS_ACCESS_DENIED
from samba.samba3 import libsmb_samba_internal as libsmb
from samba.samba3 import param as s3param
"""
def test_smb(self):
+ self._run_smb_test("smbusr")
+
+ def test_smb_no_pac(self):
+ self._run_smb_test("smbusr_nopac", include_pac=False,
+ expect_error=True)
+
+ def _run_smb_test(self, user_name, include_pac=True,
+ expect_error=False):
# Create a user account and a machine account, along with a Kerberos
# credentials cache file where the service ticket authenticating the
# user are stored.
samdb = self.get_samdb()
- user_name = "smbusr"
mach_name = samdb.host_dns_name()
service = "cifs"
share = "tmp"
(creds, cachefile) = self.create_ccache_with_user(user_credentials,
mach_credentials,
service,
- mach_name)
+ mach_name,
+ pac=include_pac)
+ # Remove the cached credentials file.
+ self.addCleanup(os.remove, cachefile.name)
# Set the Kerberos 5 credentials cache environment variable. This is
# required because the codepath that gets run (gse_krb5) looks for it
self.addCleanup(s3_lp.set, "client max protocol", max_protocol)
s3_lp.set("client max protocol", "NT1")
- conn = libsmb.Conn(mach_name, share, lp=s3_lp, creds=creds)
+ try:
+ conn = libsmb.Conn(mach_name, share, lp=s3_lp, creds=creds)
+ except NTSTATUSError as e:
+ if not expect_error:
+ self.fail()
+
+ enum, _ = e.args
+ self.assertEqual(NT_STATUS_ACCESS_DENIED, enum)
+ return
+ else:
+ self.assertFalse(expect_error)
(uid, gid, gids, sids, guest) = conn.posix_whoami()
# Ensure that they match.
self.assertEqual(sid, sids[0])
- # Remove the cached credentials file.
- os.remove(cachefile.name)
-
if __name__ == "__main__":
global_asn1_print = False
'FAST_SUPPORT': have_fast_support,
'TKT_SIG_SUPPORT': tkt_sig_support
})
-planoldpythontestsuite("ad_dc_default", "samba.tests.krb5.test_rpc",
- environ={
- 'ADMIN_USERNAME': '$USERNAME',
- 'ADMIN_PASSWORD': '$PASSWORD',
- 'STRICT_CHECKING': '0',
- 'FAST_SUPPORT': have_fast_support,
- 'TKT_SIG_SUPPORT': tkt_sig_support
- })
+for env in ['ad_dc_default', 'ad_member']:
+ planoldpythontestsuite(env, "samba.tests.krb5.test_rpc",
+ environ={
+ 'ADMIN_USERNAME': '$DC_USERNAME',
+ 'ADMIN_PASSWORD': '$DC_PASSWORD',
+ 'STRICT_CHECKING': '0',
+ 'FAST_SUPPORT': have_fast_support,
+ 'TKT_SIG_SUPPORT': tkt_sig_support
+ })
planoldpythontestsuite("ad_dc_smb1", "samba.tests.krb5.test_smb",
environ={
'ADMIN_USERNAME': '$USERNAME',