import samba.dcerpc.mgmt
import samba.dcerpc.netlogon
import samba.dcerpc.lsa
+import samba.dcerpc.schannel as schannel
import struct
from samba import gensec
from samba.tests.dcerpc.raw_testcase import RawDCERPCTest
@classmethod
def setUpDynamicTestCases(cls):
cls._setup_auth_pad_ignored()
+ cls._setup_test_schannel_invalid()
return
def _test_no_auth_request_bind_pfc_flags(self, req_pfc_flags, rep_pfc_flags):
self.assertNotConnected()
return
+ def _test_schannel_invalid_with_args(self,
+ pktype,
+ tail_padding):
+
+ ndr32 = base.transfer_syntax_ndr()
+ lsarpc_syntax = samba.dcerpc.lsa.abstract_syntax()
+
+ self.epmap_reconnect(lsarpc_syntax)
+
+ if pktype == dcerpc.DCERPC_PKT_AUTH3:
+ max_auth_length=4096
+ max_xmit_frag=self.max_xmit_frag
+ else:
+ max_auth_length=0xffff
+ max_xmit_frag=0xffff
+
+ tsf1_list = [ndr32]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+ ctx1.transfer_syntaxes = tsf1_list
+ ctx_list = [ctx1]
+
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_SCHANNEL
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_PRIVACY
+ auth_context_id = 1
+
+ nl_auth = schannel.NL_AUTH_MESSAGE()
+ nl_auth.MessageType = schannel.NL_NEGOTIATE_REQUEST
+ nl_auth.Flags = 0
+
+ nl_auth.Flags |= schannel.NL_FLAG_OEM_NETBIOS_DOMAIN_NAME
+ nl_auth.oem_netbios_domain = "NBDOMAIN"
+
+ nl_auth.Flags |= schannel.NL_FLAG_OEM_NETBIOS_COMPUTER_NAME
+ nl_auth.oem_netbios_computer = "RAWCOMPUTER"
+
+ nl_auth.Flags |= schannel.NL_FLAG_UTF8_DNS_DOMAIN_NAME
+ nl_auth.utf8_dns_domain = "DNS.DOMAIN"
+
+ nl_auth.Flags |= schannel.NL_FLAG_UTF8_NETBIOS_COMPUTER_NAME
+ nl_auth.utf8_netbios_computer = "RAWCOMPUTER"
+
+ to_server = samba.ndr.ndr_pack(nl_auth)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+
+ if pktype != dcerpc.DCERPC_PKT_BIND:
+ req = self.generate_bind(call_id=1,
+ ctx_list=ctx_list)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
+ auth_length=0)
+ self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
+ sda_str = self.secondary_address
+ sda_len = len(sda_str) + 1
+ mod_len = (2 + sda_len) % 4
+ if mod_len != 0:
+ sda_pad = 4 - mod_len
+ else:
+ sda_pad = 0
+ self.assertEqual(rep.u.secondary_address_size, sda_len)
+ self.assertEqual(rep.u.secondary_address, sda_str)
+ self.assertPadding(rep.u._pad1, sda_pad)
+ self.assertEqual(rep.u.num_results, 1)
+ self.assertEqual(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEqual(rep.u.ctx_list[0].reason,
+ dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+
+ if pktype == dcerpc.DCERPC_PKT_AUTH3:
+ req = self.generate_auth3(call_id=2,
+ auth_info=auth_info)
+ expected_fault = dcerpc.DCERPC_NCA_S_PROTO_ERROR
+ else:
+ req = self.generate_alter(call_id=2,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ expected_fault = dcerpc.DCERPC_FAULT_ACCESS_DENIED
+ else:
+ req = self.generate_bind(call_id=1,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ req_pdu = self._add_auth_padding(req,
+ tail_padding=tail_padding,
+ max_auth_length=max_auth_length,
+ max_xmit_frag=max_xmit_frag)
+ self.send_pdu_blob(req_pdu)
+ rep = self.recv_pdu()
+ if pktype != dcerpc.DCERPC_PKT_BIND:
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id,
+ pfc_flags=req.pfc_flags |
+ dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
+ auth_length=0)
+ self.assertNotEqual(rep.u.alloc_hint, 0)
+ self.assertEqual(rep.u.context_id, 0)
+ self.assertEqual(rep.u.cancel_count, 0)
+ self.assertEqual(rep.u.flags, 0)
+ self.assertEqual(rep.u.status, expected_fault)
+ self.assertEqual(rep.u.reserved, 0)
+ self.assertEqual(len(rep.u.error_and_verifier), 0)
+ else:
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
+ auth_length=0)
+ self.assertEqual(rep.u.reject_reason,
+ dcerpc.DCERPC_BIND_NAK_REASON_INVALID_CHECKSUM)
+ self.assertEqual(rep.u.num_versions, 1)
+ self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers)
+ self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
+ self.assertPadding(rep.u._pad, 3)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+
+ @classmethod
+ def _setup_test_schannel_invalid(cls):
+ pktype_methods = {
+ "bind": dcerpc.DCERPC_PKT_BIND,
+ "alter": dcerpc.DCERPC_PKT_ALTER,
+ "auth3": dcerpc.DCERPC_PKT_AUTH3,
+ }
+ paddings = {
+ "no_padding": False,
+ "tail_padding": True,
+ }
+
+ for pktype_method in pktype_methods.keys():
+ for padding in paddings.keys():
+ tname = "%s_%s" % (
+ pktype_method,
+ padding,
+ )
+ targs = (
+ pktype_methods[pktype_method],
+ paddings[padding],
+ )
+ cls.generate_dynamic_test("test_schannel_invalid",
+ tname, *targs)
+ return
+
if __name__ == "__main__":
global_ndr_print = True