import struct
from samba import gensec
from samba.tests.dcerpc.raw_testcase import RawDCERPCTest
+from samba.tests import DynamicTestCase
from samba.ntstatus import (
NT_STATUS_SUCCESS
)
global_hexdump = False
+@DynamicTestCase
class TestDCERPC_BIND(RawDCERPCTest):
def setUp(self):
self.do_ndr_print = global_ndr_print
self.do_hexdump = global_hexdump
+ @classmethod
+ def setUpDynamicTestCases(cls):
+ cls._setup_auth_pad_ignored()
+ return
+
def _test_no_auth_request_bind_pfc_flags(self, req_pfc_flags, rep_pfc_flags):
ndr32 = base.transfer_syntax_ndr()
self.assertIsNone(rep)
self.assertNotConnected()
+ def _prepare_pdu_auth_padding(self, req, before=b"", behind=b"", max_xmit_frag=None):
+ if max_xmit_frag is None:
+ max_xmit_frag = self.max_xmit_frag
+ max_pad_length = max_xmit_frag - req.frag_length
+ total_pad_length = len(before) + len(behind)
+ self.assertLessEqual(total_pad_length, max_pad_length)
+
+ req_pdu_tmp = self.prepare_pdu(req)
+
+ auth_length = dcerpc.DCERPC_AUTH_TRAILER_LENGTH + req.auth_length
+ auth_offset = req.frag_length - auth_length
+ auth_length = dcerpc.DCERPC_AUTH_TRAILER_LENGTH + req.auth_length + len(behind)
+ new_len_fields = struct.pack('<HH',
+ req.frag_length + total_pad_length,
+ req.auth_length + len(behind))
+ auth_blob = req_pdu_tmp[auth_offset:]
+
+ req_pdu = req_pdu_tmp[0:dcerpc.DCERPC_FRAG_LEN_OFFSET]
+ req_pdu += new_len_fields
+ req_pdu += req_pdu_tmp[dcerpc.DCERPC_AUTH_LEN_OFFSET+2:auth_offset]
+ req_pdu += before
+ req_pdu += auth_blob
+ req_pdu += behind
+
+ return req_pdu
+
+ def _add_auth_padding(self, req,
+ mid_padding=False,
+ tail_padding=False,
+ auth_offset_diff=0,
+ max_auth_length=0xffff,
+ max_xmit_frag=None):
+ if max_xmit_frag is None:
+ max_xmit_frag = self.max_xmit_frag
+ auth_length = dcerpc.DCERPC_AUTH_TRAILER_LENGTH + req.auth_length
+ auth_offset = req.frag_length - auth_length
+ max_padding = max_xmit_frag - req.frag_length
+ max_auth_offset = (max_xmit_frag - auth_length) & 0xfffC
+ before_len = max_auth_offset - auth_offset
+ if mid_padding and tail_padding:
+ behind_len = max_padding - before_len
+ elif mid_padding:
+ behind_len = 0
+ elif tail_padding:
+ before_len = 0
+ behind_len = max_padding
+ else:
+ before_len = 0
+ behind_len = 0
+ before_len += auth_offset_diff
+ self.assertGreaterEqual(before_len, 0)
+ if max_auth_length > req.auth_length:
+ max_behind_len = max_auth_length - req.auth_length
+ else:
+ max_behind_len = 0
+ if behind_len > max_behind_len:
+ behind_len = max_behind_len
+ req_pdu = self._prepare_pdu_auth_padding(req,
+ before=b'\xfb' * before_len,
+ behind=b'\xfa' * behind_len,
+ max_xmit_frag=max_xmit_frag)
+ return req_pdu
+
+ def _get_pdu_auth_len(self, req_pdu):
+ al = req_pdu[dcerpc.DCERPC_AUTH_LEN_OFFSET:dcerpc.DCERPC_AUTH_LEN_OFFSET+2]
+ l = struct.unpack('<H', al)
+ return l[0]
+
+ def _test_auth_pad_raw(self,
+ auth_type=None,
+ dcerpc_creds=None,
+ use_auth3=None,
+ use_smb=None,
+ transport_creds=None,
+ bind_mid_padding=False,
+ bind_tail_padding=False,
+ bind_auth_offset_diff=0,
+ bind_max_auth_length=0xffff,
+ third_mid_padding=False,
+ third_tail_padding=False,
+ third_auth_offset_diff=0,
+ third_max_auth_length=0xffff):
+
+ if dcerpc_creds is None:
+ dcerpc_creds = self.get_anon_creds()
+ if transport_creds is None:
+ transport_creds = self.get_anon_creds()
+
+ if use_smb:
+ self.reconnect_smb_pipe(primary_address='\\pipe\\lsarpc',
+ secondary_address='\\pipe\\lsass',
+ transport_creds=transport_creds)
+
+ ndr32 = base.transfer_syntax_ndr()
+
+ tsf1_list = [ndr32]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+ ctx1.transfer_syntaxes = tsf1_list
+ ctx_list = [ctx1]
+
+ bind_nak_reason = None
+ third_fault_status = None
+ third_fault_flags = 0
+ first_req_fault_status = None
+ first_req_fault_flags = 0
+ max_auth_length = 0xffff
+ if auth_type == dcerpc.DCERPC_AUTH_TYPE_KRB5:
+ expect_3legs = True
+ elif auth_type == dcerpc.DCERPC_AUTH_TYPE_NTLMSSP:
+ expect_3legs = True
+ max_auth_length = 2888
+ elif auth_type == dcerpc.DCERPC_AUTH_TYPE_SPNEGO:
+ expect_3legs = False
+ if third_mid_padding and third_tail_padding:
+ if not use_auth3:
+ third_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED
+ third_fault_flags = dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
+ else:
+ first_req_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED
+ elif third_mid_padding:
+ if use_auth3:
+ first_req_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED
+ elif third_tail_padding:
+ if not use_auth3:
+ third_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED
+ third_fault_flags = dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
+ else:
+ third_fault_status = dcerpc.DCERPC_NCA_S_PROTO_ERROR
+ third_fault_flags = dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
+ else:
+ expect_3legs = False
+
+ g = gensec.Security.start_client(self.settings)
+ g.set_credentials(dcerpc_creds)
+ g.want_feature(gensec.FEATURE_DCE_STYLE)
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT
+ auth_context_id = 2
+ g.start_mech_by_authtype(auth_type, auth_level)
+ from_server = b""
+ (finished, to_server) = g.update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+ req = self.generate_bind(call_id=1,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ req_pdu = self._add_auth_padding(req,
+ mid_padding=bind_mid_padding,
+ tail_padding=bind_tail_padding,
+ auth_offset_diff=bind_auth_offset_diff,
+ max_auth_length=bind_max_auth_length,
+ max_xmit_frag=0xffff)
+ auth_length = self._get_pdu_auth_len(req_pdu)
+ if bind_auth_offset_diff != 0:
+ bind_nak_reason = dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED
+ elif auth_length > max_auth_length:
+ bind_nak_reason = dcerpc.DCERPC_BIND_NAK_REASON_INVALID_CHECKSUM
+ self.send_pdu_blob(req_pdu)
+ rep = self.recv_pdu()
+ if bind_nak_reason is not None:
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
+ auth_length=0)
+ self.assertEqual(rep.u.reject_reason, bind_nak_reason)
+ self.assertEqual(rep.u.num_versions, 1)
+ self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers)
+ self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
+ self.assertPadding(rep.u._pad, 3)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+ return
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id)
+ self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
+ sda_str = self.secondary_address
+ sda_len = len(sda_str) + 1
+ mod_len = (2 + sda_len) % 4
+ if mod_len != 0:
+ sda_pad = 4 - mod_len
+ else:
+ sda_pad = 0
+ self.assertEqual(rep.u.secondary_address_size, sda_len)
+ self.assertEqual(rep.u.secondary_address, sda_str)
+ self.assertPadding(rep.u._pad1, sda_pad)
+ self.assertEqual(rep.u.num_results, 1)
+ self.assertEqual(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEqual(rep.u.ctx_list[0].reason,
+ dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+ self.assertNotEqual(len(rep.u.auth_info), 0)
+ a = self.parse_auth(rep.u.auth_info)
+
+ from_server = a.credentials
+ (finished, to_server) = g.update(from_server)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+ if expect_3legs:
+ self.assertTrue(finished)
+ else:
+ self.assertFalse(finished)
+ if not use_auth3:
+ req = self.generate_alter(call_id=2,
+ ctx_list=ctx_list,
+ assoc_group_id=rep.u.assoc_group_id,
+ auth_info=auth_info)
+ req_pdu = self._add_auth_padding(req,
+ mid_padding=third_mid_padding,
+ tail_padding=third_tail_padding,
+ auth_offset_diff=third_auth_offset_diff,
+ max_auth_length=third_max_auth_length,
+ max_xmit_frag=0xffff)
+ auth_length = self._get_pdu_auth_len(req_pdu)
+ if third_auth_offset_diff != 0:
+ third_fault_status = dcerpc.DCERPC_NCA_S_PROTO_ERROR
+ third_fault_flags = dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
+ if auth_length > max_auth_length:
+ third_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED
+ third_fault_flags = dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
+ self.send_pdu_blob(req_pdu)
+ rep = self.recv_pdu()
+ if third_fault_status is not None:
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id,
+ pfc_flags=req.pfc_flags | third_fault_flags,
+ auth_length=0)
+ self.assertNotEqual(rep.u.alloc_hint, 0)
+ self.assertEqual(rep.u.context_id, 0)
+ self.assertEqual(rep.u.cancel_count, 0)
+ self.assertEqual(rep.u.flags, 0)
+ self.assertEqual(rep.u.status, third_fault_status)
+ self.assertEqual(rep.u.reserved, 0)
+ self.assertEqual(len(rep.u.error_and_verifier), 0)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+ return
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id,
+ pfc_flags=req.pfc_flags)
+ self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEqual(rep.u.secondary_address_size, 0)
+ self.assertEqual(rep.u.secondary_address, "")
+ self.assertPadding(rep.u._pad1, 2)
+ self.assertEqual(rep.u.num_results, 1)
+ self.assertEqual(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEqual(rep.u.ctx_list[0].reason,
+ dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+ if finished:
+ self.assertEqual(rep.auth_length, 0)
+ else:
+ self.assertNotEqual(rep.auth_length, 0)
+ self.assertGreater(len(rep.u.auth_info), dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
+ self.assertEqual(rep.auth_length, len(rep.u.auth_info) - dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
+ else:
+ req = self.generate_auth3(call_id=2,
+ auth_info=auth_info)
+ req_pdu = self._add_auth_padding(req,
+ mid_padding=third_mid_padding,
+ tail_padding=third_tail_padding,
+ auth_offset_diff=third_auth_offset_diff,
+ max_auth_length=third_max_auth_length)
+ auth_length = self._get_pdu_auth_len(req_pdu)
+ if third_auth_offset_diff != 0:
+ third_fault_status = dcerpc.DCERPC_NCA_S_PROTO_ERROR
+ third_fault_flags = dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE
+ elif not expect_3legs:
+ pass
+ elif auth_length > max_auth_length:
+ first_req_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED
+ self.send_pdu_blob(req_pdu)
+ if third_fault_status is not None:
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id,
+ pfc_flags=req.pfc_flags | third_fault_flags,
+ auth_length=0)
+ self.assertNotEqual(rep.u.alloc_hint, 0)
+ self.assertEqual(rep.u.context_id, 0)
+ self.assertEqual(rep.u.cancel_count, 0)
+ self.assertEqual(rep.u.flags, 0)
+ self.assertEqual(rep.u.status, third_fault_status)
+ self.assertEqual(rep.u.reserved, 0)
+ self.assertEqual(len(rep.u.error_and_verifier), 0)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+ return
+ if self.transport_impersonation is None:
+ rep = self.recv_pdu(timeout=0.01)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ # And now try a request without auth_info
+ req = self.generate_request(call_id=3,
+ context_id=ctx1.context_id,
+ opnum=0,
+ stub=b"")
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ if first_req_fault_status is not None:
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id,
+ pfc_flags=req.pfc_flags | first_req_fault_flags,
+ auth_length=0)
+ self.assertNotEqual(rep.u.alloc_hint, 0)
+ self.assertEqual(rep.u.context_id, req.u.context_id)
+ self.assertEqual(rep.u.cancel_count, 0)
+ self.assertEqual(rep.u.flags, 0)
+ self.assertEqual(rep.u.status, first_req_fault_status)
+ self.assertEqual(rep.u.reserved, 0)
+ self.assertEqual(len(rep.u.error_and_verifier), 0)
+
+ if first_req_fault_flags & dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE:
+ return
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+ return
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEqual(rep.u.alloc_hint, 0)
+ self.assertEqual(rep.u.context_id, req.u.context_id)
+ self.assertEqual(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ # Now a request with auth_info DCERPC_AUTH_LEVEL_CONNECT
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=b"\x01" + b"\x00" * 15)
+ req = self.generate_request(call_id=4,
+ context_id=ctx1.context_id,
+ opnum=0,
+ stub=b"",
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ # We don't get an auth_info back
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEqual(rep.u.alloc_hint, 0)
+ self.assertEqual(rep.u.context_id, req.u.context_id)
+ self.assertEqual(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ self._disconnect("disconnect")
+ self.assertNotConnected()
+
+ def _test_auth_mid_pad_with_args(self,
+ auth_type,
+ use_auth3,
+ use_smb):
+ return self._test_auth_pad_raw(auth_type=auth_type,
+ use_auth3=use_auth3,
+ use_smb=use_smb,
+ bind_mid_padding=True,
+ third_mid_padding=True)
+
+ def _test_auth_full_pad_with_args(self,
+ auth_type,
+ use_auth3,
+ use_smb):
+ return self._test_auth_pad_raw(auth_type=auth_type,
+ use_auth3=use_auth3,
+ use_smb=use_smb,
+ bind_mid_padding=True,
+ third_mid_padding=True,
+ bind_tail_padding=True,
+ third_tail_padding=True)
+
+ def _test_auth_tail_pad_with_args(self,
+ auth_type,
+ use_auth3,
+ use_smb):
+ return self._test_auth_pad_raw(auth_type=auth_type,
+ use_auth3=use_auth3,
+ use_smb=use_smb,
+ bind_tail_padding=True,
+ third_tail_padding=True)
+
+ def _test_auth_pad_bind_align2_with_args(self,
+ auth_type,
+ use_smb):
+ return self._test_auth_pad_raw(auth_type=auth_type,
+ use_smb=use_smb,
+ bind_auth_offset_diff=2)
+
+ def _test_auth_pad_alter_align2_with_args(self,
+ auth_type,
+ use_smb):
+ return self._test_auth_pad_raw(auth_type=auth_type,
+ use_smb=use_smb,
+ third_auth_offset_diff=2)
+
+ def _test_auth_pad_auth3_align2_with_args(self,
+ auth_type,
+ use_smb):
+ return self._test_auth_pad_raw(auth_type=auth_type,
+ use_smb=use_smb,
+ use_auth3=True,
+ third_auth_offset_diff=2)
+
+ def _test_auth_pad_ntlm_2888_with_args(self,
+ use_auth3,
+ use_smb):
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP
+ return self._test_auth_pad_raw(auth_type=auth_type,
+ use_smb=use_smb,
+ use_auth3=use_auth3,
+ bind_tail_padding=True,
+ bind_max_auth_length=2888,
+ third_tail_padding=True,
+ third_max_auth_length=2888)
+
+ def _test_auth_pad_ntlm_2889_with_args(self,
+ use_auth3,
+ use_smb):
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP
+ return self._test_auth_pad_raw(auth_type=auth_type,
+ use_smb=use_smb,
+ use_auth3=use_auth3,
+ third_tail_padding=True,
+ third_max_auth_length=2889)
+
+ def _test_auth_pad_ntlm_2889_bind_with_args(self,
+ use_smb):
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP
+ return self._test_auth_pad_raw(auth_type=auth_type,
+ use_smb=use_smb,
+ bind_tail_padding=True,
+ bind_max_auth_length=2889)
+
+ @classmethod
+ def _setup_auth_pad_ignored(cls):
+ auth_types = {
+ "spnego": dcerpc.DCERPC_AUTH_TYPE_SPNEGO,
+ "ntlm": dcerpc.DCERPC_AUTH_TYPE_NTLMSSP,
+ }
+ third_methods = {
+ "alter": False,
+ "auth3": True,
+ }
+ transports = {
+ "tcp": False,
+ "smb": True,
+ }
+
+ for auth_type in auth_types.keys():
+ for third_method in third_methods.keys():
+ for transport in transports.keys():
+ tname = "%s_%s_%s" % (
+ auth_type,
+ third_method,
+ transport
+ )
+ targs = (
+ auth_types[auth_type],
+ third_methods[third_method],
+ transports[transport],
+ )
+ cls.generate_dynamic_test("test_auth_mid_pad",
+ tname, *targs)
+ cls.generate_dynamic_test("test_auth_full_pad",
+ tname, *targs)
+ cls.generate_dynamic_test("test_auth_tail_pad",
+ tname, *targs)
+
+ for auth_type in auth_types.keys():
+ for transport in transports.keys():
+ tname = "%s_%s" % (
+ auth_type,
+ transport
+ )
+ targs = (
+ auth_types[auth_type],
+ transports[transport],
+ )
+ cls.generate_dynamic_test("test_auth_pad_bind_align2",
+ tname, *targs)
+ cls.generate_dynamic_test("test_auth_pad_alter_align2",
+ tname, *targs)
+ cls.generate_dynamic_test("test_auth_pad_auth3_align2",
+ tname, *targs)
+
+ for third_method in third_methods.keys():
+ for transport in transports.keys():
+ tname = "%s_%s" % (
+ third_method,
+ transport
+ )
+ targs = (
+ third_methods[third_method],
+ transports[transport],
+ )
+ cls.generate_dynamic_test("test_auth_pad_ntlm_2888",
+ tname, *targs)
+ cls.generate_dynamic_test("test_auth_pad_ntlm_2889",
+ tname, *targs)
+
+ for transport in transports.keys():
+ tname = "%s" % (
+ transport
+ )
+ targs = (
+ transports[transport],
+ )
+ cls.generate_dynamic_test("test_auth_pad_ntlm_2889_bind",
+ tname, *targs)
+
+ return
+
def test_spnego_auth_pad_ok_bind_legacy(self):
ndr32 = base.transfer_syntax_ndr()