]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
CVE-2020-25719 CVE-2020-25717 tests/krb5: Add tests for connecting to services anonym...
authorStefan Metzmacher <metze@samba.org>
Tue, 24 Aug 2021 15:11:24 +0000 (17:11 +0200)
committerJule Anger <janger@samba.org>
Mon, 8 Nov 2021 09:52:10 +0000 (10:52 +0100)
At the end of the patchset we assume NT_STATUS_NO_IMPERSONATION_TOKEN if
no PAC is available.

For now we want to look for ACCESS_DENIED as this allows
the test to pass (showing that gensec:require_pac = true
is a useful partial mitigation).

This will also help others doing backports that do not
take the full patch set.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14801
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14799
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14561
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14556

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/krb5/test_ccache.py
python/samba/tests/krb5/test_ldap.py
python/samba/tests/krb5/test_rpc.py
python/samba/tests/krb5/test_smb.py
source4/selftest/tests.py

index 040ae5cc9a1f57e68d9a5bec2037797751b4d830..cb5061b92d9c5f8f623295629c2022253c51f943 100755 (executable)
@@ -21,10 +21,11 @@ import sys
 import os
 
 from ldb import SCOPE_SUBTREE
-from samba import gensec
+from samba import NTSTATUSError, gensec
 from samba.auth import AuthContext
 from samba.dcerpc import security
 from samba.ndr import ndr_unpack
+from samba.ntstatus import NT_STATUS_ACCESS_DENIED
 
 from samba.tests.krb5.kdc_base_test import KDCBaseTest
 
@@ -41,11 +42,18 @@ class CcacheTests(KDCBaseTest):
     """
 
     def test_ccache(self):
+        self._run_ccache_test("ccacheusr")
+
+    def test_ccache_no_pac(self):
+        self._run_ccache_test("ccacheusr_nopac", include_pac=False,
+                              expect_anon=True, allow_error=True)
+
+    def _run_ccache_test(self, user_name, include_pac=True,
+                         expect_anon=False, allow_error=False):
         # Create a user account and a machine account, along with a Kerberos
         # credentials cache file where the service ticket authenticating the
         # user are stored.
 
-        user_name = "ccacheusr"
         mach_name = "ccachemac"
         service = "host"
 
@@ -67,7 +75,10 @@ class CcacheTests(KDCBaseTest):
         # ticket, to ensure that the krbtgt ticket doesn't also need to be
         # stored.
         (creds, cachefile) = self.create_ccache_with_user(user_credentials,
-                                                          mach_credentials)
+                                                          mach_credentials,
+                                                          pac=include_pac)
+        # Remove the cached credentials file.
+        self.addCleanup(os.remove, cachefile.name)
 
         # Authenticate in-process to the machine account using the user's
         # cached credentials.
@@ -117,7 +128,16 @@ class CcacheTests(KDCBaseTest):
         sid = ndr_unpack(security.dom_sid, ldb_res[0]["objectSid"][0])
 
         # Retrieve the SIDs from the security token.
-        session = gensec_server.session_info()
+        try:
+            session = gensec_server.session_info()
+        except NTSTATUSError as e:
+            if not allow_error:
+                self.fail()
+
+            enum, _ = e.args
+            self.assertEqual(NT_STATUS_ACCESS_DENIED, enum)
+            return
+
         token = session.security_token
         token_sids = token.sids
         self.assertGreater(len(token_sids), 0)
@@ -125,9 +145,6 @@ class CcacheTests(KDCBaseTest):
         # Ensure that they match.
         self.assertEqual(sid, token_sids[0])
 
-        # Remove the cached credentials file.
-        os.remove(cachefile.name)
-
 
 if __name__ == "__main__":
     global_asn1_print = False
index 7d9ffebe2985fbb7e544a5324b42c4403c31a1d5..31e50487338b02a0da9290fb5f8ffa114417942a 100755 (executable)
 import sys
 import os
 
-from ldb import SCOPE_BASE, SCOPE_SUBTREE
+from ldb import LdbError, ERR_OPERATIONS_ERROR, SCOPE_BASE, SCOPE_SUBTREE
 from samba.dcerpc import security
 from samba.ndr import ndr_unpack
 from samba.samdb import SamDB
+from samba import credentials
 
 from samba.tests.krb5.kdc_base_test import KDCBaseTest
 
@@ -40,13 +41,20 @@ class LdapTests(KDCBaseTest):
     """
 
     def test_ldap(self):
+        self._run_ldap_test("ldapusr")
+
+    def test_ldap_no_pac(self):
+        self._run_ldap_test("ldapusr_nopac", include_pac=False,
+                            expect_anon=True, allow_error=True)
+
+    def _run_ldap_test(self, user_name, include_pac=True,
+                       expect_anon=False, allow_error=False):
         # Create a user account and a machine account, along with a Kerberos
         # credentials cache file where the service ticket authenticating the
         # user are stored.
 
         samdb = self.get_samdb()
 
-        user_name = "ldapusr"
         mach_name = samdb.host_dns_name()
         service = "ldap"
 
@@ -62,7 +70,10 @@ class LdapTests(KDCBaseTest):
         (creds, cachefile) = self.create_ccache_with_user(user_credentials,
                                                           mach_credentials,
                                                           service,
-                                                          mach_name)
+                                                          mach_name,
+                                                          pac=include_pac)
+        # Remove the cached credentials file.
+        self.addCleanup(os.remove, cachefile.name)
 
         # Authenticate in-process to the machine account using the user's
         # cached credentials.
@@ -74,22 +85,61 @@ class LdapTests(KDCBaseTest):
         self.assertEqual(1, len(ldb_res))
         sid = ndr_unpack(security.dom_sid, ldb_res[0]["objectSid"][0])
 
+        # Connect to the machine account and retrieve the user SID.
+        try:
+            ldb_as_user = SamDB(url="ldap://%s" % mach_name,
+                                credentials=creds,
+                                lp=self.get_lp())
+        except LdbError as e:
+            if not allow_error:
+                self.fail()
+
+            enum, estr = e.args
+            self.assertEqual(ERR_OPERATIONS_ERROR, enum)
+            self.assertIn('NT_STATUS_ACCESS_DENIED', estr)
+            return
+
+        ldb_res = ldb_as_user.search('',
+                                     scope=SCOPE_BASE,
+                                     attrs=["tokenGroups"])
+        self.assertEqual(1, len(ldb_res))
+
+        token_groups = ldb_res[0]["tokenGroups"]
+        token_sid = ndr_unpack(security.dom_sid, token_groups[0])
+
+        if expect_anon:
+            # Ensure we got an anonymous token.
+            self.assertEqual(security.SID_NT_ANONYMOUS, str(token_sid))
+            token_sid = ndr_unpack(security.dom_sid, token_groups[1])
+            self.assertEqual(security.SID_NT_NETWORK, str(token_sid))
+            if len(token_groups) >= 3:
+                token_sid = ndr_unpack(security.dom_sid, token_groups[2])
+                self.assertEqual(security.SID_NT_THIS_ORGANISATION,
+                                 str(token_sid))
+        else:
+            # Ensure that they match.
+            self.assertEqual(sid, token_sid)
+
+    def test_ldap_anonymous(self):
+        samdb = self.get_samdb()
+        mach_name = samdb.host_dns_name()
+
+        anon_creds = credentials.Credentials()
+        anon_creds.set_anonymous()
+
         # Connect to the machine account and retrieve the user SID.
         ldb_as_user = SamDB(url="ldap://%s" % mach_name,
-                            credentials=creds,
+                            credentials=anon_creds,
                             lp=self.get_lp())
         ldb_res = ldb_as_user.search('',
                                      scope=SCOPE_BASE,
                                      attrs=["tokenGroups"])
         self.assertEqual(1, len(ldb_res))
 
+        # Ensure we got an anonymous token.
         token_sid = ndr_unpack(security.dom_sid, ldb_res[0]["tokenGroups"][0])
-
-        # Ensure that they match.
-        self.assertEqual(sid, token_sid)
-
-        # Remove the cached credentials file.
-        os.remove(cachefile.name)
+        self.assertEqual(security.SID_NT_ANONYMOUS, str(token_sid))
+        self.assertEqual(len(ldb_res[0]["tokenGroups"]), 1)
 
 
 if __name__ == "__main__":
index ef8dd4dcbf5bd8f5e60410e9c816f8bbafa36760..54ad7cf0e481227e7c31ec98cdbe55bd2fe85ca4 100755 (executable)
@@ -20,7 +20,9 @@
 import sys
 import os
 
+from samba import NTSTATUSError, credentials
 from samba.dcerpc import lsa
+from samba.ntstatus import NT_STATUS_ACCESS_DENIED
 
 from samba.tests.krb5.kdc_base_test import KDCBaseTest
 
@@ -37,13 +39,20 @@ class RpcTests(KDCBaseTest):
     """
 
     def test_rpc(self):
+        self._run_rpc_test("rpcusr")
+
+    def test_rpc_no_pac(self):
+        self._run_rpc_test("rpcusr_nopac", include_pac=False,
+                           expect_anon=True, allow_error=True)
+
+    def _run_rpc_test(self, user_name, include_pac=True,
+                      expect_anon=False, allow_error=False):
         # Create a user account and a machine account, along with a Kerberos
         # credentials cache file where the service ticket authenticating the
         # user are stored.
 
         samdb = self.get_samdb()
 
-        user_name = "rpcusr"
         mach_name = samdb.host_dns_name()
         service = "cifs"
 
@@ -59,20 +68,45 @@ class RpcTests(KDCBaseTest):
         (creds, cachefile) = self.create_ccache_with_user(user_credentials,
                                                           mach_credentials,
                                                           service,
-                                                          mach_name)
+                                                          mach_name,
+                                                          pac=include_pac)
+        # Remove the cached credentials file.
+        self.addCleanup(os.remove, cachefile.name)
 
         # Authenticate in-process to the machine account using the user's
         # cached credentials.
 
         binding_str = "ncacn_np:%s[\\pipe\\lsarpc]" % mach_name
-        conn = lsa.lsarpc(binding_str, self.get_lp(), creds)
+        try:
+            conn = lsa.lsarpc(binding_str, self.get_lp(), creds)
+        except NTSTATUSError as e:
+            if not allow_error:
+                self.fail()
+
+            enum, _ = e.args
+            self.assertEqual(NT_STATUS_ACCESS_DENIED, enum)
+            return
 
         (account_name, _) = conn.GetUserName(None, None, None)
 
-        self.assertEqual(user_name, account_name.string)
+        if expect_anon:
+            self.assertNotEqual(user_name, account_name.string)
+        else:
+            self.assertEqual(user_name, account_name.string)
 
-        # Remove the cached credentials file.
-        os.remove(cachefile.name)
+    def test_rpc_anonymous(self):
+        samdb = self.get_samdb()
+        mach_name = samdb.host_dns_name()
+
+        anon_creds = credentials.Credentials()
+        anon_creds.set_anonymous()
+
+        binding_str = "ncacn_np:%s[\\pipe\\lsarpc]" % mach_name
+        conn = lsa.lsarpc(binding_str, self.get_lp(), anon_creds)
+
+        (account_name, _) = conn.GetUserName(None, None, None)
+
+        self.assertEqual('ANONYMOUS LOGON', account_name.string)
 
 
 if __name__ == "__main__":
index 1e70ed322bfc781d1170b03e85f9699fd222f4e0..79ff16ac8795cc1021c5ce6ffd47349a7cc51cc8 100755 (executable)
@@ -21,8 +21,10 @@ import sys
 import os
 
 from ldb import SCOPE_SUBTREE
+from samba import NTSTATUSError
 from samba.dcerpc import security
 from samba.ndr import ndr_unpack
+from samba.ntstatus import NT_STATUS_ACCESS_DENIED
 from samba.samba3 import libsmb_samba_internal as libsmb
 from samba.samba3 import param as s3param
 
@@ -41,13 +43,20 @@ class SmbTests(KDCBaseTest):
     """
 
     def test_smb(self):
+        self._run_smb_test("smbusr")
+
+    def test_smb_no_pac(self):
+        self._run_smb_test("smbusr_nopac", include_pac=False,
+                           expect_error=True)
+
+    def _run_smb_test(self, user_name, include_pac=True,
+                      expect_error=False):
         # Create a user account and a machine account, along with a Kerberos
         # credentials cache file where the service ticket authenticating the
         # user are stored.
 
         samdb = self.get_samdb()
 
-        user_name = "smbusr"
         mach_name = samdb.host_dns_name()
         service = "cifs"
         share = "tmp"
@@ -64,7 +73,10 @@ class SmbTests(KDCBaseTest):
         (creds, cachefile) = self.create_ccache_with_user(user_credentials,
                                                           mach_credentials,
                                                           service,
-                                                          mach_name)
+                                                          mach_name,
+                                                          pac=include_pac)
+        # Remove the cached credentials file.
+        self.addCleanup(os.remove, cachefile.name)
 
         # Set the Kerberos 5 credentials cache environment variable. This is
         # required because the codepath that gets run (gse_krb5) looks for it
@@ -95,16 +107,23 @@ class SmbTests(KDCBaseTest):
         self.addCleanup(s3_lp.set, "client max protocol", max_protocol)
         s3_lp.set("client max protocol", "NT1")
 
-        conn = libsmb.Conn(mach_name, share, lp=s3_lp, creds=creds)
+        try:
+            conn = libsmb.Conn(mach_name, share, lp=s3_lp, creds=creds)
+        except NTSTATUSError as e:
+            if not expect_error:
+                self.fail()
+
+            enum, _ = e.args
+            self.assertEqual(NT_STATUS_ACCESS_DENIED, enum)
+            return
+        else:
+            self.assertFalse(expect_error)
 
         (uid, gid, gids, sids, guest) = conn.posix_whoami()
 
         # Ensure that they match.
         self.assertEqual(sid, sids[0])
 
-        # Remove the cached credentials file.
-        os.remove(cachefile.name)
-
 
 if __name__ == "__main__":
     global_asn1_print = False
index 53721d1afda6ea3b55a09580522495d84b7cd753..bd68094436fbbc7cad40522d4a0c9206424752cc 100755 (executable)
@@ -828,14 +828,15 @@ planoldpythontestsuite("ad_dc_default", "samba.tests.krb5.test_ldap",
                            'FAST_SUPPORT': have_fast_support,
                            'TKT_SIG_SUPPORT': tkt_sig_support
                        })
-planoldpythontestsuite("ad_dc_default", "samba.tests.krb5.test_rpc",
-                       environ={
-                           'ADMIN_USERNAME': '$USERNAME',
-                           'ADMIN_PASSWORD': '$PASSWORD',
-                           'STRICT_CHECKING': '0',
-                           'FAST_SUPPORT': have_fast_support,
-                           'TKT_SIG_SUPPORT': tkt_sig_support
-                       })
+for env in ['ad_dc_default', 'ad_member']:
+    planoldpythontestsuite(env, "samba.tests.krb5.test_rpc",
+                           environ={
+                               'ADMIN_USERNAME': '$DC_USERNAME',
+                               'ADMIN_PASSWORD': '$DC_PASSWORD',
+                               'STRICT_CHECKING': '0',
+                               'FAST_SUPPORT': have_fast_support,
+                               'TKT_SIG_SUPPORT': tkt_sig_support
+                           })
 planoldpythontestsuite("ad_dc_smb1", "samba.tests.krb5.test_smb",
                        environ={
                            'ADMIN_USERNAME': '$USERNAME',