]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
CVE-2022-32743 tests/py_credentials: Add tests for setting dNSHostName with LogonGetD...
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Tue, 7 Jun 2022 05:35:35 +0000 (17:35 +1200)
committerDouglas Bagnall <dbagnall@samba.org>
Thu, 28 Jul 2022 22:47:37 +0000 (22:47 +0000)
Test that the value is properly validated, and that it can be set
regardless of rights on the account.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14833

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
python/samba/tests/py_credentials.py
selftest/knownfail.d/netlogon-dns-host-name [new file with mode: 0644]

index ecb8271b5953dcbee5637a761a1c57ff89589118..0c442b81f3fcd4db5f67b6fc3b090f5d1cfff759 100644 (file)
@@ -18,6 +18,8 @@
 from samba.tests import TestCase, delete_force
 import os
 
+import ldb
+
 import samba
 from samba.auth import system_session
 from samba.credentials import (
@@ -25,7 +27,7 @@ from samba.credentials import (
     CLI_CRED_NTLMv2_AUTH,
     CLI_CRED_NTLM_AUTH,
     DONT_USE_KERBEROS)
-from samba.dcerpc import netlogon, ntlmssp, srvsvc
+from samba.dcerpc import lsa, netlogon, ntlmssp, security, srvsvc
 from samba.dcerpc.netlogon import (
     netr_Authenticator,
     netr_WorkstationInformation,
@@ -36,10 +38,11 @@ from samba.dsdb import (
     UF_WORKSTATION_TRUST_ACCOUNT,
     UF_PASSWD_NOTREQD,
     UF_NORMAL_ACCOUNT)
-from samba.ndr import ndr_pack
+from samba.ndr import ndr_pack, ndr_unpack
 from samba.samdb import SamDB
 from samba import NTSTATUSError, ntstatus
 from samba.common import get_string
+from samba.sd_utils import SDUtils
 
 import ctypes
 
@@ -105,6 +108,280 @@ class PyCredentialsTests(TestCase):
         (authenticator, subsequent) = self.get_authenticator(c)
         self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
 
+    # Test using LogonGetDomainInfo to update dNSHostName to an allowed value.
+    def test_set_dns_hostname_valid(self):
+        c = self.get_netlogon_connection()
+        authenticator, subsequent = self.get_authenticator(c)
+
+        domain_hostname = self.ldb.domain_dns_name()
+
+        new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
+        new_dns_hostname = new_dns_hostname.encode('utf-8')
+
+        query = netr_WorkstationInformation()
+        query.os_name = lsa.String('some OS')
+        query.dns_hostname = new_dns_hostname
+
+        c.netr_LogonGetDomainInfo(
+            server_name=self.server,
+            computer_name=self.user_creds.get_workstation(),
+            credential=authenticator,
+            return_authenticator=subsequent,
+            level=1,
+            query=query)
+
+        # Check the result.
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['dNSHostName'])
+        self.assertEqual(1, len(res))
+
+        got_dns_hostname = res[0].get('dNSHostName', idx=0)
+        self.assertEqual(new_dns_hostname, got_dns_hostname)
+
+    # Test using LogonGetDomainInfo to update dNSHostName to an allowed value,
+    # when we are denied the right to do so.
+    def test_set_dns_hostname_valid_denied(self):
+        c = self.get_netlogon_connection()
+        authenticator, subsequent = self.get_authenticator(c)
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['objectSid'])
+        self.assertEqual(1, len(res))
+
+        machine_sid = ndr_unpack(security.dom_sid,
+                                 res[0].get('objectSid', idx=0))
+
+        sd_utils = SDUtils(self.ldb)
+
+        # Deny Validated Write and Write Property.
+        mod = (f'(OD;;SWWP;{security.GUID_DRS_DNS_HOST_NAME};;'
+               f'{machine_sid})')
+        sd_utils.dacl_add_ace(self.machine_dn, mod)
+
+        domain_hostname = self.ldb.domain_dns_name()
+
+        new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
+        new_dns_hostname = new_dns_hostname.encode('utf-8')
+
+        query = netr_WorkstationInformation()
+        query.os_name = lsa.String('some OS')
+        query.dns_hostname = new_dns_hostname
+
+        c.netr_LogonGetDomainInfo(
+            server_name=self.server,
+            computer_name=self.user_creds.get_workstation(),
+            credential=authenticator,
+            return_authenticator=subsequent,
+            level=1,
+            query=query)
+
+        # Check the result.
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['dNSHostName'])
+        self.assertEqual(1, len(res))
+
+        got_dns_hostname = res[0].get('dNSHostName', idx=0)
+        self.assertEqual(new_dns_hostname, got_dns_hostname)
+
+    # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
+    # invalid value, even with Validated Write.
+    def test_set_dns_hostname_invalid_validated_write(self):
+        c = self.get_netlogon_connection()
+        authenticator, subsequent = self.get_authenticator(c)
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['objectSid'])
+        self.assertEqual(1, len(res))
+
+        machine_sid = ndr_unpack(security.dom_sid,
+                                 res[0].get('objectSid', idx=0))
+
+        sd_utils = SDUtils(self.ldb)
+
+        # Grant Validated Write.
+        mod = (f'(OA;;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
+               f'{machine_sid})')
+        sd_utils.dacl_add_ace(self.machine_dn, mod)
+
+        new_dns_hostname = b'invalid'
+
+        query = netr_WorkstationInformation()
+        query.os_name = lsa.String('some OS')
+        query.dns_hostname = new_dns_hostname
+
+        c.netr_LogonGetDomainInfo(
+            server_name=self.server,
+            computer_name=self.user_creds.get_workstation(),
+            credential=authenticator,
+            return_authenticator=subsequent,
+            level=1,
+            query=query)
+
+        # Check the result.
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['dNSHostName'])
+        self.assertEqual(1, len(res))
+
+        got_dns_hostname = res[0].get('dNSHostName', idx=0)
+        self.assertIsNone(got_dns_hostname)
+
+    # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
+    # invalid value, even with Write Property.
+    def test_set_dns_hostname_invalid_write_property(self):
+        c = self.get_netlogon_connection()
+        authenticator, subsequent = self.get_authenticator(c)
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['objectSid'])
+        self.assertEqual(1, len(res))
+
+        machine_sid = ndr_unpack(security.dom_sid,
+                                 res[0].get('objectSid', idx=0))
+
+        sd_utils = SDUtils(self.ldb)
+
+        # Grant Write Property.
+        mod = (f'(OA;;WP;{security.GUID_DRS_DNS_HOST_NAME};;'
+               f'{machine_sid})')
+        sd_utils.dacl_add_ace(self.machine_dn, mod)
+
+        new_dns_hostname = b'invalid'
+
+        query = netr_WorkstationInformation()
+        query.os_name = lsa.String('some OS')
+        query.dns_hostname = new_dns_hostname
+
+        c.netr_LogonGetDomainInfo(
+            server_name=self.server,
+            computer_name=self.user_creds.get_workstation(),
+            credential=authenticator,
+            return_authenticator=subsequent,
+            level=1,
+            query=query)
+
+        # Check the result.
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['dNSHostName'])
+        self.assertEqual(1, len(res))
+
+        got_dns_hostname = res[0].get('dNSHostName', idx=0)
+        self.assertIsNone(got_dns_hostname)
+
+    # Show we can't use LogonGetDomainInfo to set the dNSHostName to just the
+    # machine name.
+    def test_set_dns_hostname_to_machine_name(self):
+        c = self.get_netlogon_connection()
+        authenticator, subsequent = self.get_authenticator(c)
+
+        new_dns_hostname = self.machine_name.encode('utf-8')
+
+        query = netr_WorkstationInformation()
+        query.os_name = lsa.String('some OS')
+        query.dns_hostname = new_dns_hostname
+
+        c.netr_LogonGetDomainInfo(
+            server_name=self.server,
+            computer_name=self.user_creds.get_workstation(),
+            credential=authenticator,
+            return_authenticator=subsequent,
+            level=1,
+            query=query)
+
+        # Check the result.
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['dNSHostName'])
+        self.assertEqual(1, len(res))
+
+        got_dns_hostname = res[0].get('dNSHostName', idx=0)
+        self.assertIsNone(got_dns_hostname)
+
+    # Show we can't use LogonGetDomainInfo to set dNSHostName with an invalid
+    # suffix.
+    def test_set_dns_hostname_invalid_suffix(self):
+        c = self.get_netlogon_connection()
+        authenticator, subsequent = self.get_authenticator(c)
+
+        domain_hostname = self.ldb.domain_dns_name()
+
+        new_dns_hostname = f'{self.machine_name}.foo.{domain_hostname}'
+        new_dns_hostname = new_dns_hostname.encode('utf-8')
+
+        query = netr_WorkstationInformation()
+        query.os_name = lsa.String('some OS')
+        query.dns_hostname = new_dns_hostname
+
+        c.netr_LogonGetDomainInfo(
+            server_name=self.server,
+            computer_name=self.user_creds.get_workstation(),
+            credential=authenticator,
+            return_authenticator=subsequent,
+            level=1,
+            query=query)
+
+        # Check the result.
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['dNSHostName'])
+        self.assertEqual(1, len(res))
+
+        got_dns_hostname = res[0].get('dNSHostName', idx=0)
+        self.assertIsNone(got_dns_hostname)
+
+    # Test that setting the HANDLES_SPN_UPDATE flag inhibits the dNSHostName
+    # update, but other attributes are still updated.
+    def test_set_dns_hostname_with_flag(self):
+        c = self.get_netlogon_connection()
+        authenticator, subsequent = self.get_authenticator(c)
+
+        domain_hostname = self.ldb.domain_dns_name()
+
+        new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
+        new_dns_hostname = new_dns_hostname.encode('utf-8')
+
+        operating_system = 'some OS'
+
+        query = netr_WorkstationInformation()
+        query.os_name = lsa.String(operating_system)
+
+        query.dns_hostname = new_dns_hostname
+        query.workstation_flags = netlogon.NETR_WS_FLAG_HANDLES_SPN_UPDATE
+
+        c.netr_LogonGetDomainInfo(
+            server_name=self.server,
+            computer_name=self.user_creds.get_workstation(),
+            credential=authenticator,
+            return_authenticator=subsequent,
+            level=1,
+            query=query)
+
+        # Check the result.
+
+        res = self.ldb.search(self.machine_dn,
+                              scope=ldb.SCOPE_BASE,
+                              attrs=['dNSHostName',
+                                     'operatingSystem'])
+        self.assertEqual(1, len(res))
+
+        got_dns_hostname = res[0].get('dNSHostName', idx=0)
+        self.assertIsNone(got_dns_hostname)
+
+        got_os = res[0].get('operatingSystem', idx=0)
+        self.assertEqual(operating_system.encode('utf-8'), got_os)
+
     def test_SamLogonEx(self):
         c = self.get_netlogon_connection()
 
diff --git a/selftest/knownfail.d/netlogon-dns-host-name b/selftest/knownfail.d/netlogon-dns-host-name
new file mode 100644 (file)
index 0000000..2d0a0ec
--- /dev/null
@@ -0,0 +1,2 @@
+^samba.tests.py_credentials.samba.tests.py_credentials.PyCredentialsTests.test_set_dns_hostname_invalid_suffix\(
+^samba.tests.py_credentials.samba.tests.py_credentials.PyCredentialsTests.test_set_dns_hostname_with_flag\(