]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
python:tests/krb5: add domain trust tests to netlogon.py
authorStefan Metzmacher <metze@samba.org>
Mon, 16 Dec 2024 14:18:54 +0000 (15:18 +0100)
committerStefan Metzmacher <metze@samba.org>
Wed, 8 Jan 2025 09:13:31 +0000 (09:13 +0000)
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jennifer Sutton <jennifersutton@catalyst.net.nz>
python/samba/tests/krb5/netlogon.py

index 391211153aa564be66ad4e2545ca5ceff47ebf6d..8d3158619f549a5694e382851d7246498b1d14d8 100755 (executable)
@@ -26,10 +26,11 @@ from samba import (
     credentials,
     ntstatus,
     NTSTATUSError,
+    hresult,
     generate_random_password,
     generate_random_bytes,
 )
-from samba.dcerpc import netlogon, samr, misc, ntlmssp, krb5pac
+from samba.dcerpc import netlogon, samr, misc, ntlmssp, krb5pac, security, lsa
 from samba.ndr import ndr_deepcopy, ndr_print, ndr_pack
 from samba.crypto import md4_hash_blob
 from samba.tests import DynamicTestCase, env_get_var_value
@@ -62,8 +63,16 @@ class NetlogonSchannel(KDCBaseTest):
             "ticket_samlogon",
         ]
 
+        trusts = [
+            "wks",
+            "bdc",
+            "rodc",
+            "uptrust",
+            "downtrust",
+        ]
+
         for test in tests:
-            for trust in ["wks", "bdc", "rodc"]:
+            for trust in trusts:
                 for auth3_flags in [0x603fffff, 0x613fffff, 0xe13fffff]:
                     setup_test(test, trust, "auth3", auth3_flags)
                 for auth3_flags in [0x00004004, 0x00004000, 0x01000000]:
@@ -75,6 +84,10 @@ class NetlogonSchannel(KDCBaseTest):
                 for authK_flags in [0x613fffff, 0x413fffff, 0x400001ff]:
                     setup_test(test, trust, "authK", authK_flags)
 
+        for trust in trusts:
+            setup_test("simple", trust, "auth3", 0x613fffff)
+            setup_test("simple", trust, "authK", 0xe13fffff)
+
     def setUp(self):
         super().setUp()
         self.do_asn1_print = global_asn1_print
@@ -126,6 +139,70 @@ class NetlogonSchannel(KDCBaseTest):
         computer_creds = krbtgt_creds.get_rodc_computer_creds()
         return computer_creds
 
+    def get_uptrust1_creds(self):
+
+        # This creates a forest trust
+
+        trust_dns_name = "netlogon-fdom.netlogon.example.com"
+        trust_nbt_name = "NETLOGON-FDOM"
+        trust_sid = security.dom_sid("S-1-5-21-1-2-3")
+
+        trust_enc_types = lsa.TrustDomainInfoSupportedEncTypes()
+        trust_enc_types.enc_types = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
+
+        trust_info = lsa.TrustDomainInfoInfoEx()
+        trust_info.trust_type = lsa.LSA_TRUST_TYPE_UPLEVEL
+        trust_info.trust_direction = 0
+        trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_INBOUND
+        trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_OUTBOUND
+        trust_info.trust_attributes = 0
+        trust_info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_FOREST_TRANSITIVE
+
+        trust_info.domain_name.string = trust_dns_name
+        trust_info.netbios_name.string = trust_nbt_name
+        trust_info.sid = trust_sid
+
+        trust_incoming_password = "%sIncomingPassword!" % trust_dns_name
+        trust_outgoing_password = "%sOutgoingPassword!" % trust_dns_name
+
+        _, _, _, trust_account_creds = \
+            self.create_trust(trust_info,
+                              trust_enc_types=trust_enc_types,
+                              trust_incoming_password=trust_incoming_password,
+                              trust_outgoing_password=trust_outgoing_password,
+                              preserve=False)
+
+        return trust_account_creds
+
+    def get_downtrust1_creds(self):
+
+        # This creates a downlevel external trust
+
+        trust_nbt_name = "NETLOGON-EDOM"
+        trust_sid = security.dom_sid("S-1-5-21-3-2-1")
+
+        trust_info = lsa.TrustDomainInfoInfoEx()
+        trust_info.trust_type = lsa.LSA_TRUST_TYPE_DOWNLEVEL
+        trust_info.trust_direction = 0
+        trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_INBOUND
+        trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_OUTBOUND
+        trust_info.trust_attributes = 0
+
+        trust_info.domain_name.string = trust_nbt_name
+        trust_info.netbios_name.string = trust_nbt_name
+        trust_info.sid = trust_sid
+
+        trust_incoming_password = "%sIncomingPassword!" % trust_nbt_name
+        trust_outgoing_password = "%sOutgoingPassword!" % trust_nbt_name
+
+        _, _, _, trust_account_creds = \
+            self.create_trust(trust_info,
+                              trust_incoming_password=trust_incoming_password,
+                              trust_outgoing_password=trust_outgoing_password,
+                              preserve=False)
+
+        return trust_account_creds
+
     def get_anon_conn(self):
         dc_server = self.dc_server
         conn = netlogon.netlogon(f'ncacn_ip_tcp:{dc_server}',
@@ -182,7 +259,12 @@ class NetlogonSchannel(KDCBaseTest):
             expect_error=None):
         (auth_type, auth_level) = conn.auth_info()
 
-        trust_account_name = trust_creds.get_username()
+        secure_channel_type = trust_creds.get_secure_channel_type()
+        if secure_channel_type == misc.SEC_CHAN_DNS_DOMAIN:
+            outgoing_creds = trust_creds.get_trust_outgoing_creds()
+            trust_account_name = outgoing_creds.get_realm().lower() + "."
+        else:
+            trust_account_name = trust_creds.get_username()
         trust_computer_name = trust_creds.get_workstation()
 
         client_challenge = credentials.netlogon_creds_random_challenge()
@@ -199,7 +281,7 @@ class NetlogonSchannel(KDCBaseTest):
             credentials.netlogon_creds_client_init(
                 client_account=trust_account_name,
                 client_computer_name=trust_computer_name,
-                secure_channel_type=trust_creds.get_secure_channel_type(),
+                secure_channel_type=secure_channel_type,
                 client_challenge=client_challenge,
                 server_challenge=server_challenge,
                 machine_password=machine_password,
@@ -241,14 +323,19 @@ class NetlogonSchannel(KDCBaseTest):
             expect_error=None):
         (auth_type, auth_level) = conn.auth_info()
 
-        trust_account_name = trust_creds.get_username()
+        secure_channel_type = trust_creds.get_secure_channel_type()
+        if secure_channel_type == misc.SEC_CHAN_DNS_DOMAIN:
+            outgoing_creds = trust_creds.get_trust_outgoing_creds()
+            trust_account_name = outgoing_creds.get_realm().lower() + "."
+        else:
+            trust_account_name = trust_creds.get_username()
         trust_computer_name = trust_creds.get_workstation()
 
         ncreds = \
             credentials.netlogon_creds_kerberos_init(
                 client_account=trust_account_name,
                 client_computer_name=trust_computer_name,
-                secure_channel_type=trust_creds.get_secure_channel_type(),
+                secure_channel_type=secure_channel_type,
                 client_requested_flags=negotiate_flags,
                 negotiate_flags=negotiate_flags)
 
@@ -779,9 +866,24 @@ class NetlogonSchannel(KDCBaseTest):
         elif logon_type == netlogon.NetlogonTicketLogonInformation:
             user_tgt = self.get_tgt(user_creds)
 
-            service_ticket = self.get_service_ticket(user_tgt, trust_creds)
-            service_ticket_data = self.der_encode(service_ticket.ticket,
-                                                  asn1Spec=krb5_asn1.Ticket())
+            if self.is_domain_trust(ncreds):
+                #
+                # We keep it simple for now and
+                # just pass the user_tgt
+                #
+                # It will generate
+                # NETLOGON_TICKET_LOGON_FAILED_LOGON
+                # with HRES_SEC_E_WRONG_PRINCIPAL
+                #
+                # But here we're only testing
+                # the netlogon as transport
+                #
+                service_ticket_data = self.der_encode(user_tgt.ticket,
+                                                      asn1Spec=krb5_asn1.Ticket())
+            else:
+                service_ticket = self.get_service_ticket(user_tgt, trust_creds)
+                service_ticket_data = self.der_encode(service_ticket.ticket,
+                                                      asn1Spec=krb5_asn1.Ticket())
 
             logon = netlogon.netr_TicketLogonInfo()
             logon.request_options = 0
@@ -971,6 +1073,10 @@ class NetlogonSchannel(KDCBaseTest):
             creds = self.get_bdc1_creds()
         elif trust == "rodc":
             creds = self.get_rodc1_creds()
+        elif trust == "uptrust":
+            creds = self.get_uptrust1_creds()
+        elif trust == "downtrust":
+            creds = self.get_downtrust1_creds()
         self.assertIsNotNone(creds)
 
         proposed_flags = flags
@@ -1027,7 +1133,9 @@ class NetlogonSchannel(KDCBaseTest):
         expect_new_password = self.get_samr_Password(nt_hash)
         old_nt_hash = None
         if self.is_domain_trust(ncreds):
-            old_nt_hash = trust_creds.get_old_hash()
+            old_nt_hash = trust_creds.get_old_nt_hash()
+            if old_nt_hash is None:
+                old_nt_hash = nt_hash
         if old_nt_hash:
             expect_old_password = self.get_samr_Password(old_nt_hash)
         else:
@@ -1076,6 +1184,8 @@ class NetlogonSchannel(KDCBaseTest):
             expect_get_error = ntstatus.NT_STATUS_ACCESS_DENIED
         elif ncreds.secure_channel_type == misc.SEC_CHAN_RODC:
             expect_get_error = ntstatus.NT_STATUS_ACCESS_DENIED
+        elif self.is_domain_trust(ncreds):
+            expect_get_error = ntstatus.NT_STATUS_ACCESS_DENIED
         else:
             expect_get_error = None
         self.do_ServerPasswordGet(ncreds, conn,
@@ -1307,6 +1417,8 @@ class NetlogonSchannel(KDCBaseTest):
         opaque_buffer = b'invalid_opaque_buffer'
         if ncreds.secure_channel_type == misc.SEC_CHAN_WKSTA:
             expect_invalid_error = ntstatus.NT_STATUS_ACCESS_DENIED
+        elif self.is_domain_trust(ncreds):
+            expect_invalid_error = ntstatus.NT_STATUS_ACCESS_DENIED
         else:
             expect_invalid_error = ntstatus.NT_STATUS_INVALID_PARAMETER
         self.do_SendToSam(ncreds, conn, opaque_buffer,
@@ -1322,6 +1434,8 @@ class NetlogonSchannel(KDCBaseTest):
         opaque_buffer = ndr_pack(bmsg)
         if ncreds.secure_channel_type == misc.SEC_CHAN_WKSTA:
             expect_not_found_error = ntstatus.NT_STATUS_ACCESS_DENIED
+        elif self.is_domain_trust(ncreds):
+            expect_not_found_error = ntstatus.NT_STATUS_ACCESS_DENIED
         elif expect_broken_crypto:
             expect_not_found_error = ntstatus.NT_STATUS_INVALID_PARAMETER
         elif ncreds.secure_channel_type == misc.SEC_CHAN_RODC:
@@ -1341,6 +1455,8 @@ class NetlogonSchannel(KDCBaseTest):
         opaque_buffer = ndr_pack(bmsg)
         if ncreds.secure_channel_type == misc.SEC_CHAN_WKSTA:
             expect_no_error = ntstatus.NT_STATUS_ACCESS_DENIED
+        elif self.is_domain_trust(ncreds):
+            expect_no_error = ntstatus.NT_STATUS_ACCESS_DENIED
         elif expect_broken_crypto:
             expect_no_error = ntstatus.NT_STATUS_INVALID_PARAMETER
         elif ncreds.secure_channel_type == misc.SEC_CHAN_RODC:
@@ -1735,20 +1851,34 @@ class NetlogonSchannel(KDCBaseTest):
                                        validation_level,
                                        expect_send_encrypted,
                                        expect_recv_encrypted)
-        if validationEx.results & netlogon.NETLOGON_TICKET_LOGON_SOURCE_USER_CLAIMS:
+        if self.is_domain_trust(ncreds):
+            self.assertEqual(validationEx.results,
+                netlogon.NETLOGON_TICKET_LOGON_FAILED_LOGON)
+        elif validationEx.results & netlogon.NETLOGON_TICKET_LOGON_SOURCE_USER_CLAIMS:
             self.assertEqual(validationEx.results,
                 netlogon.NETLOGON_TICKET_LOGON_SOURCE_USER_CLAIMS |
                 netlogon.NETLOGON_TICKET_LOGON_FULL_SIGNATURE_PRESENT)
         else:
             self.assertEqual(validationEx.results,
                 netlogon.NETLOGON_TICKET_LOGON_FULL_SIGNATURE_PRESENT)
-        self.assertEqual(validationEx.kerberos_status[0], ntstatus.NT_STATUS_OK)
-        self.assertEqual(validationEx.netlogon_status[0], ntstatus.NT_STATUS_OK)
-        self.assertIsNone(validationEx.source_of_status.string)
-        self.assertIsNotNone(validationEx.user_information)
-        self.assertNotEqual(validationEx.user_information.base.rid, 0)
-        self.assertEqual(validationEx.user_information.base.key.key, list(b'\x00' *16))
-        self.assertIsNone(validationEx.device_information)
+        if self.is_domain_trust(ncreds):
+            self.assertEqual(validationEx.kerberos_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL)
+            self.assertEqual(validationEx.netlogon_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL)
+            self.assertEqual(validationEx.source_of_status.string, self.dc_server.lower())
+            self.assertIsNone(validationEx.user_information)
+            self.assertIsNone(validationEx.device_information)
+            self.assertEqual(validationEx.user_claims_length, 0)
+            self.assertIsNone(validationEx.user_claims)
+            self.assertEqual(validationEx.device_claims_length, 0)
+            self.assertIsNone(validationEx.device_claims)
+        else:
+            self.assertEqual(validationEx.kerberos_status[0], ntstatus.NT_STATUS_OK)
+            self.assertEqual(validationEx.netlogon_status[0], ntstatus.NT_STATUS_OK)
+            self.assertIsNone(validationEx.source_of_status.string)
+            self.assertIsNotNone(validationEx.user_information)
+            self.assertNotEqual(validationEx.user_information.base.rid, 0)
+            self.assertEqual(validationEx.user_information.base.key.key, list(b'\x00' *16))
+            self.assertIsNone(validationEx.device_information)
 
         expect_send_encrypted = False
         expect_recv_encrypted = False
@@ -1758,14 +1888,25 @@ class NetlogonSchannel(KDCBaseTest):
                                               expect_send_encrypted,
                                               expect_recv_encrypted)
         self.assertEqual(validationWF.results, validationEx.results)
-        self.assertEqual(validationWF.kerberos_status[0], ntstatus.NT_STATUS_OK)
-        self.assertEqual(validationWF.netlogon_status[0], ntstatus.NT_STATUS_OK)
-        self.assertIsNone(validationWF.source_of_status.string)
-        self.assertIsNotNone(validationWF.user_information)
-        self.assertEqual(validationWF.user_information.base.rid,
-                         validationEx.user_information.base.rid)
-        self.assertEqual(validationWF.user_information.base.key.key, list(b'\x00' *16))
-        self.assertIsNone(validationWF.device_information)
+        if self.is_domain_trust(ncreds):
+            self.assertEqual(validationWF.kerberos_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL)
+            self.assertEqual(validationWF.netlogon_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL)
+            self.assertEqual(validationWF.source_of_status.string, self.dc_server.lower())
+            self.assertIsNone(validationWF.user_information)
+            self.assertIsNone(validationWF.device_information)
+            self.assertEqual(validationWF.user_claims_length, 0)
+            self.assertIsNone(validationWF.user_claims)
+            self.assertEqual(validationWF.device_claims_length, 0)
+            self.assertIsNone(validationWF.device_claims)
+        else:
+            self.assertEqual(validationWF.kerberos_status[0], ntstatus.NT_STATUS_OK)
+            self.assertEqual(validationWF.netlogon_status[0], ntstatus.NT_STATUS_OK)
+            self.assertIsNone(validationWF.source_of_status.string)
+            self.assertIsNotNone(validationWF.user_information)
+            self.assertEqual(validationWF.user_information.base.rid,
+                             validationEx.user_information.base.rid)
+            self.assertEqual(validationWF.user_information.base.key.key, list(b'\x00' *16))
+            self.assertIsNone(validationWF.device_information)
 
         self.do_CheckCapabilities(ncreds, conn)
         return
@@ -1967,6 +2108,15 @@ class NetlogonSchannel(KDCBaseTest):
         self.do_CheckCapabilities(bdc1_ncreds, bdc1_conn)
         return
 
+    def _test_simple_with_args(self, trust, authX, flags):
+        (creds, ncreds, conn, expect_encrypted) = \
+            self._prepare_ncreds_conn_with_args(trust, authX, flags)
+
+        if conn is None:
+            return
+
+        self.do_CheckCapabilities(ncreds, conn)
+        return
 
 if __name__ == "__main__":
     global_asn1_print = True