credentials,
ntstatus,
NTSTATUSError,
+ hresult,
generate_random_password,
generate_random_bytes,
)
-from samba.dcerpc import netlogon, samr, misc, ntlmssp, krb5pac
+from samba.dcerpc import netlogon, samr, misc, ntlmssp, krb5pac, security, lsa
from samba.ndr import ndr_deepcopy, ndr_print, ndr_pack
from samba.crypto import md4_hash_blob
from samba.tests import DynamicTestCase, env_get_var_value
"ticket_samlogon",
]
+ trusts = [
+ "wks",
+ "bdc",
+ "rodc",
+ "uptrust",
+ "downtrust",
+ ]
+
for test in tests:
- for trust in ["wks", "bdc", "rodc"]:
+ for trust in trusts:
for auth3_flags in [0x603fffff, 0x613fffff, 0xe13fffff]:
setup_test(test, trust, "auth3", auth3_flags)
for auth3_flags in [0x00004004, 0x00004000, 0x01000000]:
for authK_flags in [0x613fffff, 0x413fffff, 0x400001ff]:
setup_test(test, trust, "authK", authK_flags)
+ for trust in trusts:
+ setup_test("simple", trust, "auth3", 0x613fffff)
+ setup_test("simple", trust, "authK", 0xe13fffff)
+
def setUp(self):
super().setUp()
self.do_asn1_print = global_asn1_print
computer_creds = krbtgt_creds.get_rodc_computer_creds()
return computer_creds
+ def get_uptrust1_creds(self):
+
+ # This creates a forest trust
+
+ trust_dns_name = "netlogon-fdom.netlogon.example.com"
+ trust_nbt_name = "NETLOGON-FDOM"
+ trust_sid = security.dom_sid("S-1-5-21-1-2-3")
+
+ trust_enc_types = lsa.TrustDomainInfoSupportedEncTypes()
+ trust_enc_types.enc_types = security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
+
+ trust_info = lsa.TrustDomainInfoInfoEx()
+ trust_info.trust_type = lsa.LSA_TRUST_TYPE_UPLEVEL
+ trust_info.trust_direction = 0
+ trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_INBOUND
+ trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_OUTBOUND
+ trust_info.trust_attributes = 0
+ trust_info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_FOREST_TRANSITIVE
+
+ trust_info.domain_name.string = trust_dns_name
+ trust_info.netbios_name.string = trust_nbt_name
+ trust_info.sid = trust_sid
+
+ trust_incoming_password = "%sIncomingPassword!" % trust_dns_name
+ trust_outgoing_password = "%sOutgoingPassword!" % trust_dns_name
+
+ _, _, _, trust_account_creds = \
+ self.create_trust(trust_info,
+ trust_enc_types=trust_enc_types,
+ trust_incoming_password=trust_incoming_password,
+ trust_outgoing_password=trust_outgoing_password,
+ preserve=False)
+
+ return trust_account_creds
+
+ def get_downtrust1_creds(self):
+
+ # This creates a downlevel external trust
+
+ trust_nbt_name = "NETLOGON-EDOM"
+ trust_sid = security.dom_sid("S-1-5-21-3-2-1")
+
+ trust_info = lsa.TrustDomainInfoInfoEx()
+ trust_info.trust_type = lsa.LSA_TRUST_TYPE_DOWNLEVEL
+ trust_info.trust_direction = 0
+ trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_INBOUND
+ trust_info.trust_direction |= lsa.LSA_TRUST_DIRECTION_OUTBOUND
+ trust_info.trust_attributes = 0
+
+ trust_info.domain_name.string = trust_nbt_name
+ trust_info.netbios_name.string = trust_nbt_name
+ trust_info.sid = trust_sid
+
+ trust_incoming_password = "%sIncomingPassword!" % trust_nbt_name
+ trust_outgoing_password = "%sOutgoingPassword!" % trust_nbt_name
+
+ _, _, _, trust_account_creds = \
+ self.create_trust(trust_info,
+ trust_incoming_password=trust_incoming_password,
+ trust_outgoing_password=trust_outgoing_password,
+ preserve=False)
+
+ return trust_account_creds
+
def get_anon_conn(self):
dc_server = self.dc_server
conn = netlogon.netlogon(f'ncacn_ip_tcp:{dc_server}',
expect_error=None):
(auth_type, auth_level) = conn.auth_info()
- trust_account_name = trust_creds.get_username()
+ secure_channel_type = trust_creds.get_secure_channel_type()
+ if secure_channel_type == misc.SEC_CHAN_DNS_DOMAIN:
+ outgoing_creds = trust_creds.get_trust_outgoing_creds()
+ trust_account_name = outgoing_creds.get_realm().lower() + "."
+ else:
+ trust_account_name = trust_creds.get_username()
trust_computer_name = trust_creds.get_workstation()
client_challenge = credentials.netlogon_creds_random_challenge()
credentials.netlogon_creds_client_init(
client_account=trust_account_name,
client_computer_name=trust_computer_name,
- secure_channel_type=trust_creds.get_secure_channel_type(),
+ secure_channel_type=secure_channel_type,
client_challenge=client_challenge,
server_challenge=server_challenge,
machine_password=machine_password,
expect_error=None):
(auth_type, auth_level) = conn.auth_info()
- trust_account_name = trust_creds.get_username()
+ secure_channel_type = trust_creds.get_secure_channel_type()
+ if secure_channel_type == misc.SEC_CHAN_DNS_DOMAIN:
+ outgoing_creds = trust_creds.get_trust_outgoing_creds()
+ trust_account_name = outgoing_creds.get_realm().lower() + "."
+ else:
+ trust_account_name = trust_creds.get_username()
trust_computer_name = trust_creds.get_workstation()
ncreds = \
credentials.netlogon_creds_kerberos_init(
client_account=trust_account_name,
client_computer_name=trust_computer_name,
- secure_channel_type=trust_creds.get_secure_channel_type(),
+ secure_channel_type=secure_channel_type,
client_requested_flags=negotiate_flags,
negotiate_flags=negotiate_flags)
elif logon_type == netlogon.NetlogonTicketLogonInformation:
user_tgt = self.get_tgt(user_creds)
- service_ticket = self.get_service_ticket(user_tgt, trust_creds)
- service_ticket_data = self.der_encode(service_ticket.ticket,
- asn1Spec=krb5_asn1.Ticket())
+ if self.is_domain_trust(ncreds):
+ #
+ # We keep it simple for now and
+ # just pass the user_tgt
+ #
+ # It will generate
+ # NETLOGON_TICKET_LOGON_FAILED_LOGON
+ # with HRES_SEC_E_WRONG_PRINCIPAL
+ #
+ # But here we're only testing
+ # the netlogon as transport
+ #
+ service_ticket_data = self.der_encode(user_tgt.ticket,
+ asn1Spec=krb5_asn1.Ticket())
+ else:
+ service_ticket = self.get_service_ticket(user_tgt, trust_creds)
+ service_ticket_data = self.der_encode(service_ticket.ticket,
+ asn1Spec=krb5_asn1.Ticket())
logon = netlogon.netr_TicketLogonInfo()
logon.request_options = 0
creds = self.get_bdc1_creds()
elif trust == "rodc":
creds = self.get_rodc1_creds()
+ elif trust == "uptrust":
+ creds = self.get_uptrust1_creds()
+ elif trust == "downtrust":
+ creds = self.get_downtrust1_creds()
self.assertIsNotNone(creds)
proposed_flags = flags
expect_new_password = self.get_samr_Password(nt_hash)
old_nt_hash = None
if self.is_domain_trust(ncreds):
- old_nt_hash = trust_creds.get_old_hash()
+ old_nt_hash = trust_creds.get_old_nt_hash()
+ if old_nt_hash is None:
+ old_nt_hash = nt_hash
if old_nt_hash:
expect_old_password = self.get_samr_Password(old_nt_hash)
else:
expect_get_error = ntstatus.NT_STATUS_ACCESS_DENIED
elif ncreds.secure_channel_type == misc.SEC_CHAN_RODC:
expect_get_error = ntstatus.NT_STATUS_ACCESS_DENIED
+ elif self.is_domain_trust(ncreds):
+ expect_get_error = ntstatus.NT_STATUS_ACCESS_DENIED
else:
expect_get_error = None
self.do_ServerPasswordGet(ncreds, conn,
opaque_buffer = b'invalid_opaque_buffer'
if ncreds.secure_channel_type == misc.SEC_CHAN_WKSTA:
expect_invalid_error = ntstatus.NT_STATUS_ACCESS_DENIED
+ elif self.is_domain_trust(ncreds):
+ expect_invalid_error = ntstatus.NT_STATUS_ACCESS_DENIED
else:
expect_invalid_error = ntstatus.NT_STATUS_INVALID_PARAMETER
self.do_SendToSam(ncreds, conn, opaque_buffer,
opaque_buffer = ndr_pack(bmsg)
if ncreds.secure_channel_type == misc.SEC_CHAN_WKSTA:
expect_not_found_error = ntstatus.NT_STATUS_ACCESS_DENIED
+ elif self.is_domain_trust(ncreds):
+ expect_not_found_error = ntstatus.NT_STATUS_ACCESS_DENIED
elif expect_broken_crypto:
expect_not_found_error = ntstatus.NT_STATUS_INVALID_PARAMETER
elif ncreds.secure_channel_type == misc.SEC_CHAN_RODC:
opaque_buffer = ndr_pack(bmsg)
if ncreds.secure_channel_type == misc.SEC_CHAN_WKSTA:
expect_no_error = ntstatus.NT_STATUS_ACCESS_DENIED
+ elif self.is_domain_trust(ncreds):
+ expect_no_error = ntstatus.NT_STATUS_ACCESS_DENIED
elif expect_broken_crypto:
expect_no_error = ntstatus.NT_STATUS_INVALID_PARAMETER
elif ncreds.secure_channel_type == misc.SEC_CHAN_RODC:
validation_level,
expect_send_encrypted,
expect_recv_encrypted)
- if validationEx.results & netlogon.NETLOGON_TICKET_LOGON_SOURCE_USER_CLAIMS:
+ if self.is_domain_trust(ncreds):
+ self.assertEqual(validationEx.results,
+ netlogon.NETLOGON_TICKET_LOGON_FAILED_LOGON)
+ elif validationEx.results & netlogon.NETLOGON_TICKET_LOGON_SOURCE_USER_CLAIMS:
self.assertEqual(validationEx.results,
netlogon.NETLOGON_TICKET_LOGON_SOURCE_USER_CLAIMS |
netlogon.NETLOGON_TICKET_LOGON_FULL_SIGNATURE_PRESENT)
else:
self.assertEqual(validationEx.results,
netlogon.NETLOGON_TICKET_LOGON_FULL_SIGNATURE_PRESENT)
- self.assertEqual(validationEx.kerberos_status[0], ntstatus.NT_STATUS_OK)
- self.assertEqual(validationEx.netlogon_status[0], ntstatus.NT_STATUS_OK)
- self.assertIsNone(validationEx.source_of_status.string)
- self.assertIsNotNone(validationEx.user_information)
- self.assertNotEqual(validationEx.user_information.base.rid, 0)
- self.assertEqual(validationEx.user_information.base.key.key, list(b'\x00' *16))
- self.assertIsNone(validationEx.device_information)
+ if self.is_domain_trust(ncreds):
+ self.assertEqual(validationEx.kerberos_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL)
+ self.assertEqual(validationEx.netlogon_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL)
+ self.assertEqual(validationEx.source_of_status.string, self.dc_server.lower())
+ self.assertIsNone(validationEx.user_information)
+ self.assertIsNone(validationEx.device_information)
+ self.assertEqual(validationEx.user_claims_length, 0)
+ self.assertIsNone(validationEx.user_claims)
+ self.assertEqual(validationEx.device_claims_length, 0)
+ self.assertIsNone(validationEx.device_claims)
+ else:
+ self.assertEqual(validationEx.kerberos_status[0], ntstatus.NT_STATUS_OK)
+ self.assertEqual(validationEx.netlogon_status[0], ntstatus.NT_STATUS_OK)
+ self.assertIsNone(validationEx.source_of_status.string)
+ self.assertIsNotNone(validationEx.user_information)
+ self.assertNotEqual(validationEx.user_information.base.rid, 0)
+ self.assertEqual(validationEx.user_information.base.key.key, list(b'\x00' *16))
+ self.assertIsNone(validationEx.device_information)
expect_send_encrypted = False
expect_recv_encrypted = False
expect_send_encrypted,
expect_recv_encrypted)
self.assertEqual(validationWF.results, validationEx.results)
- self.assertEqual(validationWF.kerberos_status[0], ntstatus.NT_STATUS_OK)
- self.assertEqual(validationWF.netlogon_status[0], ntstatus.NT_STATUS_OK)
- self.assertIsNone(validationWF.source_of_status.string)
- self.assertIsNotNone(validationWF.user_information)
- self.assertEqual(validationWF.user_information.base.rid,
- validationEx.user_information.base.rid)
- self.assertEqual(validationWF.user_information.base.key.key, list(b'\x00' *16))
- self.assertIsNone(validationWF.device_information)
+ if self.is_domain_trust(ncreds):
+ self.assertEqual(validationWF.kerberos_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL)
+ self.assertEqual(validationWF.netlogon_status[0], hresult.HRES_SEC_E_WRONG_PRINCIPAL)
+ self.assertEqual(validationWF.source_of_status.string, self.dc_server.lower())
+ self.assertIsNone(validationWF.user_information)
+ self.assertIsNone(validationWF.device_information)
+ self.assertEqual(validationWF.user_claims_length, 0)
+ self.assertIsNone(validationWF.user_claims)
+ self.assertEqual(validationWF.device_claims_length, 0)
+ self.assertIsNone(validationWF.device_claims)
+ else:
+ self.assertEqual(validationWF.kerberos_status[0], ntstatus.NT_STATUS_OK)
+ self.assertEqual(validationWF.netlogon_status[0], ntstatus.NT_STATUS_OK)
+ self.assertIsNone(validationWF.source_of_status.string)
+ self.assertIsNotNone(validationWF.user_information)
+ self.assertEqual(validationWF.user_information.base.rid,
+ validationEx.user_information.base.rid)
+ self.assertEqual(validationWF.user_information.base.key.key, list(b'\x00' *16))
+ self.assertIsNone(validationWF.device_information)
self.do_CheckCapabilities(ncreds, conn)
return
self.do_CheckCapabilities(bdc1_ncreds, bdc1_conn)
return
+ def _test_simple_with_args(self, trust, authX, flags):
+ (creds, ncreds, conn, expect_encrypted) = \
+ self._prepare_ncreds_conn_with_args(trust, authX, flags)
+
+ if conn is None:
+ return
+
+ self.do_CheckCapabilities(ncreds, conn)
+ return
if __name__ == "__main__":
global_asn1_print = True