From: Stefan Metzmacher Date: Mon, 16 Dec 2024 14:18:54 +0000 (+0100) Subject: python:tests/krb5: add domain trust tests to netlogon.py X-Git-Tag: tdb-1.4.13~197 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=ca795ef4720411fb21607a5bd67da2d0beb3f54e;p=thirdparty%2Fsamba.git python:tests/krb5: add domain trust tests to netlogon.py Signed-off-by: Stefan Metzmacher Reviewed-by: Jennifer Sutton --- diff --git a/python/samba/tests/krb5/netlogon.py b/python/samba/tests/krb5/netlogon.py index 391211153aa..8d3158619f5 100755 --- a/python/samba/tests/krb5/netlogon.py +++ b/python/samba/tests/krb5/netlogon.py @@ -26,10 +26,11 @@ from samba import ( credentials, ntstatus, NTSTATUSError, + hresult, generate_random_password, generate_random_bytes, ) -from samba.dcerpc import netlogon, samr, misc, ntlmssp, krb5pac +from samba.dcerpc import netlogon, samr, misc, ntlmssp, krb5pac, security, lsa from samba.ndr import ndr_deepcopy, ndr_print, ndr_pack from samba.crypto import md4_hash_blob from samba.tests import DynamicTestCase, env_get_var_value @@ -62,8 +63,16 @@ class NetlogonSchannel(KDCBaseTest): "ticket_samlogon", ] + trusts = [ + "wks", + "bdc", + "rodc", + "uptrust", + "downtrust", + ] + for test in tests: - for trust in ["wks", "bdc", "rodc"]: + for trust in trusts: for auth3_flags in [0x603fffff, 0x613fffff, 0xe13fffff]: setup_test(test, trust, "auth3", auth3_flags) for auth3_flags in [0x00004004, 0x00004000, 0x01000000]: @@ -75,6 +84,10 @@ class NetlogonSchannel(KDCBaseTest): for authK_flags in [0x613fffff, 0x413fffff, 0x400001ff]: setup_test(test, trust, "authK", authK_flags) + for trust in trusts: + setup_test("simple", trust, "auth3", 0x613fffff) + setup_test("simple", trust, "authK", 0xe13fffff) + def setUp(self): super().setUp() self.do_asn1_print = global_asn1_print @@ -126,6 +139,70 @@ class NetlogonSchannel(KDCBaseTest): computer_creds = krbtgt_creds.get_rodc_computer_creds() return computer_creds + def get_uptrust1_creds(self): + + # This creates a forest trust + + trust_dns_name = "netlogon-fdom.netlogon.example.com" + trust_nbt_name = "NETLOGON-FDOM" + trust_sid = security.dom_sid("S-1-5-21-1-2-3") + + trust_enc_types = lsa.TrustDomainInfoSupportedEncTypes() + trust_enc_types.enc_types = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 + + trust_info = lsa.TrustDomainInfoInfoEx() + trust_info.trust_type = lsa.LSA_TRUST_TYPE_UPLEVEL + trust_info.trust_direction = 0 + trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_INBOUND + trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_OUTBOUND + trust_info.trust_attributes = 0 + trust_info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_FOREST_TRANSITIVE + + trust_info.domain_name.string = trust_dns_name + trust_info.netbios_name.string = trust_nbt_name + trust_info.sid = trust_sid + + trust_incoming_password = "%sIncomingPassword!" % trust_dns_name + trust_outgoing_password = "%sOutgoingPassword!" % trust_dns_name + + _, _, _, trust_account_creds = \ + self.create_trust(trust_info, + trust_enc_types=trust_enc_types, + trust_incoming_password=trust_incoming_password, + trust_outgoing_password=trust_outgoing_password, + preserve=False) + + return trust_account_creds + + def get_downtrust1_creds(self): + + # This creates a downlevel external trust + + trust_nbt_name = "NETLOGON-EDOM" + trust_sid = security.dom_sid("S-1-5-21-3-2-1") + + trust_info = lsa.TrustDomainInfoInfoEx() + trust_info.trust_type = lsa.LSA_TRUST_TYPE_DOWNLEVEL + trust_info.trust_direction = 0 + trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_INBOUND + trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_OUTBOUND + trust_info.trust_attributes = 0 + + trust_info.domain_name.string = trust_nbt_name + trust_info.netbios_name.string = trust_nbt_name + trust_info.sid = trust_sid + + trust_incoming_password = "%sIncomingPassword!" % trust_nbt_name + trust_outgoing_password = "%sOutgoingPassword!" % trust_nbt_name + + _, _, _, trust_account_creds = \ + self.create_trust(trust_info, + trust_incoming_password=trust_incoming_password, + trust_outgoing_password=trust_outgoing_password, + preserve=False) + + return trust_account_creds + def get_anon_conn(self): dc_server = self.dc_server conn = netlogon.netlogon(f'ncacn_ip_tcp:{dc_server}', @@ -182,7 +259,12 @@ class NetlogonSchannel(KDCBaseTest): expect_error=None): (auth_type, auth_level) = conn.auth_info() - trust_account_name = trust_creds.get_username() + secure_channel_type = trust_creds.get_secure_channel_type() + if secure_channel_type == misc.SEC_CHAN_DNS_DOMAIN: + outgoing_creds = trust_creds.get_trust_outgoing_creds() + trust_account_name = outgoing_creds.get_realm().lower() + "." + else: + trust_account_name = trust_creds.get_username() trust_computer_name = trust_creds.get_workstation() client_challenge = credentials.netlogon_creds_random_challenge() @@ -199,7 +281,7 @@ class NetlogonSchannel(KDCBaseTest): credentials.netlogon_creds_client_init( client_account=trust_account_name, client_computer_name=trust_computer_name, - secure_channel_type=trust_creds.get_secure_channel_type(), + secure_channel_type=secure_channel_type, client_challenge=client_challenge, server_challenge=server_challenge, machine_password=machine_password, @@ -241,14 +323,19 @@ class NetlogonSchannel(KDCBaseTest): expect_error=None): (auth_type, auth_level) = conn.auth_info() - trust_account_name = trust_creds.get_username() + secure_channel_type = trust_creds.get_secure_channel_type() + if secure_channel_type == misc.SEC_CHAN_DNS_DOMAIN: + outgoing_creds = trust_creds.get_trust_outgoing_creds() + trust_account_name = outgoing_creds.get_realm().lower() + "." + else: + trust_account_name = trust_creds.get_username() trust_computer_name = trust_creds.get_workstation() ncreds = \ credentials.netlogon_creds_kerberos_init( client_account=trust_account_name, client_computer_name=trust_computer_name, - secure_channel_type=trust_creds.get_secure_channel_type(), + secure_channel_type=secure_channel_type, client_requested_flags=negotiate_flags, negotiate_flags=negotiate_flags) @@ -779,9 +866,24 @@ class NetlogonSchannel(KDCBaseTest): elif logon_type == netlogon.NetlogonTicketLogonInformation: user_tgt = self.get_tgt(user_creds) - service_ticket = self.get_service_ticket(user_tgt, trust_creds) - service_ticket_data = self.der_encode(service_ticket.ticket, - asn1Spec=krb5_asn1.Ticket()) + if self.is_domain_trust(ncreds): + # + # We keep it simple for now and + # just pass the user_tgt + # + # It will generate + # NETLOGON_TICKET_LOGON_FAILED_LOGON + # with HRES_SEC_E_WRONG_PRINCIPAL + # + # But here we're only testing + # the netlogon as transport + # + service_ticket_data = self.der_encode(user_tgt.ticket, + asn1Spec=krb5_asn1.Ticket()) + else: + service_ticket = self.get_service_ticket(user_tgt, trust_creds) + service_ticket_data = self.der_encode(service_ticket.ticket, + asn1Spec=krb5_asn1.Ticket()) logon = netlogon.netr_TicketLogonInfo() logon.request_options = 0 @@ -971,6 +1073,10 @@ class NetlogonSchannel(KDCBaseTest): creds = self.get_bdc1_creds() elif trust == "rodc": creds = self.get_rodc1_creds() + elif trust == "uptrust": + creds = self.get_uptrust1_creds() + elif trust == "downtrust": + creds = self.get_downtrust1_creds() self.assertIsNotNone(creds) proposed_flags = flags @@ -1027,7 +1133,9 @@ class NetlogonSchannel(KDCBaseTest): expect_new_password = self.get_samr_Password(nt_hash) old_nt_hash = None if self.is_domain_trust(ncreds): - old_nt_hash = trust_creds.get_old_hash() + old_nt_hash = trust_creds.get_old_nt_hash() + if old_nt_hash is None: + old_nt_hash = nt_hash if old_nt_hash: expect_old_password = self.get_samr_Password(old_nt_hash) else: @@ -1076,6 +1184,8 @@ class NetlogonSchannel(KDCBaseTest): expect_get_error = ntstatus.NT_STATUS_ACCESS_DENIED elif ncreds.secure_channel_type == misc.SEC_CHAN_RODC: expect_get_error = ntstatus.NT_STATUS_ACCESS_DENIED + elif self.is_domain_trust(ncreds): + expect_get_error = ntstatus.NT_STATUS_ACCESS_DENIED else: expect_get_error = None self.do_ServerPasswordGet(ncreds, conn, @@ -1307,6 +1417,8 @@ class NetlogonSchannel(KDCBaseTest): opaque_buffer = b'invalid_opaque_buffer' if ncreds.secure_channel_type == misc.SEC_CHAN_WKSTA: expect_invalid_error = ntstatus.NT_STATUS_ACCESS_DENIED + elif self.is_domain_trust(ncreds): + expect_invalid_error = ntstatus.NT_STATUS_ACCESS_DENIED else: expect_invalid_error = ntstatus.NT_STATUS_INVALID_PARAMETER self.do_SendToSam(ncreds, conn, opaque_buffer, @@ -1322,6 +1434,8 @@ class NetlogonSchannel(KDCBaseTest): opaque_buffer = ndr_pack(bmsg) if ncreds.secure_channel_type == misc.SEC_CHAN_WKSTA: expect_not_found_error = ntstatus.NT_STATUS_ACCESS_DENIED + elif self.is_domain_trust(ncreds): + expect_not_found_error = ntstatus.NT_STATUS_ACCESS_DENIED elif expect_broken_crypto: expect_not_found_error = ntstatus.NT_STATUS_INVALID_PARAMETER elif ncreds.secure_channel_type == misc.SEC_CHAN_RODC: @@ -1341,6 +1455,8 @@ class NetlogonSchannel(KDCBaseTest): opaque_buffer = ndr_pack(bmsg) if ncreds.secure_channel_type == misc.SEC_CHAN_WKSTA: expect_no_error = ntstatus.NT_STATUS_ACCESS_DENIED + elif self.is_domain_trust(ncreds): + expect_no_error = ntstatus.NT_STATUS_ACCESS_DENIED elif expect_broken_crypto: expect_no_error = ntstatus.NT_STATUS_INVALID_PARAMETER elif ncreds.secure_channel_type == misc.SEC_CHAN_RODC: @@ -1735,20 +1851,34 @@ class NetlogonSchannel(KDCBaseTest): validation_level, expect_send_encrypted, expect_recv_encrypted) - if validationEx.results & netlogon.NETLOGON_TICKET_LOGON_SOURCE_USER_CLAIMS: + if self.is_domain_trust(ncreds): + self.assertEqual(validationEx.results, + netlogon.NETLOGON_TICKET_LOGON_FAILED_LOGON) + elif validationEx.results & netlogon.NETLOGON_TICKET_LOGON_SOURCE_USER_CLAIMS: self.assertEqual(validationEx.results, netlogon.NETLOGON_TICKET_LOGON_SOURCE_USER_CLAIMS | netlogon.NETLOGON_TICKET_LOGON_FULL_SIGNATURE_PRESENT) else: self.assertEqual(validationEx.results, netlogon.NETLOGON_TICKET_LOGON_FULL_SIGNATURE_PRESENT) - self.assertEqual(validationEx.kerberos_status[0], ntstatus.NT_STATUS_OK) - self.assertEqual(validationEx.netlogon_status[0], ntstatus.NT_STATUS_OK) - self.assertIsNone(validationEx.source_of_status.string) - self.assertIsNotNone(validationEx.user_information) - self.assertNotEqual(validationEx.user_information.base.rid, 0) - self.assertEqual(validationEx.user_information.base.key.key, list(b'\x00' *16)) - self.assertIsNone(validationEx.device_information) + if self.is_domain_trust(ncreds): + self.assertEqual(validationEx.kerberos_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL) + self.assertEqual(validationEx.netlogon_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL) + self.assertEqual(validationEx.source_of_status.string, self.dc_server.lower()) + self.assertIsNone(validationEx.user_information) + self.assertIsNone(validationEx.device_information) + self.assertEqual(validationEx.user_claims_length, 0) + self.assertIsNone(validationEx.user_claims) + self.assertEqual(validationEx.device_claims_length, 0) + self.assertIsNone(validationEx.device_claims) + else: + self.assertEqual(validationEx.kerberos_status[0], ntstatus.NT_STATUS_OK) + self.assertEqual(validationEx.netlogon_status[0], ntstatus.NT_STATUS_OK) + self.assertIsNone(validationEx.source_of_status.string) + self.assertIsNotNone(validationEx.user_information) + self.assertNotEqual(validationEx.user_information.base.rid, 0) + self.assertEqual(validationEx.user_information.base.key.key, list(b'\x00' *16)) + self.assertIsNone(validationEx.device_information) expect_send_encrypted = False expect_recv_encrypted = False @@ -1758,14 +1888,25 @@ class NetlogonSchannel(KDCBaseTest): expect_send_encrypted, expect_recv_encrypted) self.assertEqual(validationWF.results, validationEx.results) - self.assertEqual(validationWF.kerberos_status[0], ntstatus.NT_STATUS_OK) - self.assertEqual(validationWF.netlogon_status[0], ntstatus.NT_STATUS_OK) - self.assertIsNone(validationWF.source_of_status.string) - self.assertIsNotNone(validationWF.user_information) - self.assertEqual(validationWF.user_information.base.rid, - validationEx.user_information.base.rid) - self.assertEqual(validationWF.user_information.base.key.key, list(b'\x00' *16)) - self.assertIsNone(validationWF.device_information) + if self.is_domain_trust(ncreds): + self.assertEqual(validationWF.kerberos_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL) + self.assertEqual(validationWF.netlogon_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL) + self.assertEqual(validationWF.source_of_status.string, self.dc_server.lower()) + self.assertIsNone(validationWF.user_information) + self.assertIsNone(validationWF.device_information) + self.assertEqual(validationWF.user_claims_length, 0) + self.assertIsNone(validationWF.user_claims) + self.assertEqual(validationWF.device_claims_length, 0) + self.assertIsNone(validationWF.device_claims) + else: + self.assertEqual(validationWF.kerberos_status[0], ntstatus.NT_STATUS_OK) + self.assertEqual(validationWF.netlogon_status[0], ntstatus.NT_STATUS_OK) + self.assertIsNone(validationWF.source_of_status.string) + self.assertIsNotNone(validationWF.user_information) + self.assertEqual(validationWF.user_information.base.rid, + validationEx.user_information.base.rid) + self.assertEqual(validationWF.user_information.base.key.key, list(b'\x00' *16)) + self.assertIsNone(validationWF.device_information) self.do_CheckCapabilities(ncreds, conn) return @@ -1967,6 +2108,15 @@ class NetlogonSchannel(KDCBaseTest): self.do_CheckCapabilities(bdc1_ncreds, bdc1_conn) return + def _test_simple_with_args(self, trust, authX, flags): + (creds, ncreds, conn, expect_encrypted) = \ + self._prepare_ncreds_conn_with_args(trust, authX, flags) + + if conn is None: + return + + self.do_CheckCapabilities(ncreds, conn) + return if __name__ == "__main__": global_asn1_print = True