From: Stefan Metzmacher Date: Wed, 26 Mar 2014 21:42:19 +0000 (+0100) Subject: CVE-2015-5370: python/samba/tests: add infrastructure to do raw protocol tests for... X-Git-Tag: samba-4.2.10~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=df411cbdb4a83443b03efa102ff46ad8043ed985;p=thirdparty%2Fsamba.git CVE-2015-5370: python/samba/tests: add infrastructure to do raw protocol tests for DCERPC These are independent from our client library and allow testing of invalid pdus. BUG: https://bugzilla.samba.org/show_bug.cgi?id=11344 Signed-off-by: Stefan Metzmacher --- diff --git a/python/samba/tests/__init__.py b/python/samba/tests/__init__.py index b53c4ea027f..0499b4e0fc3 100644 --- a/python/samba/tests/__init__.py +++ b/python/samba/tests/__init__.py @@ -1,5 +1,6 @@ # Unix SMB/CIFS implementation. # Copyright (C) Jelmer Vernooij 2007-2010 +# Copyright (C) Stefan Metzmacher 2014,2015 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -24,6 +25,12 @@ import samba.auth from samba import param from samba.samdb import SamDB from samba import credentials +import samba.ndr +import samba.dcerpc.dcerpc +import samba.dcerpc.base +import samba.dcerpc.epmapper +import socket +import struct import subprocess import sys import tempfile @@ -222,6 +229,524 @@ cmdline_credentials = None class RpcInterfaceTestCase(TestCase): """DCE/RPC Test case.""" +class RawDCERPCTest(TestCase): + """A raw DCE/RPC Test case.""" + + def _disconnect(self, reason): + if self.s is None: + return + self.s.close() + self.s = None + if self.do_hexdump: + sys.stderr.write("disconnect[%s]\n" % reason) + + def connect(self): + try: + self.a = socket.getaddrinfo(self.host, self.tcp_port, socket.AF_UNSPEC, + socket.SOCK_STREAM, socket.SOL_TCP, + 0) + self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2]) + self.s.settimeout(10) + self.s.connect(self.a[0][4]) + except socket.error as e: + self.s.close() + raise + except IOError as e: + self.s.close() + raise + except Exception as e: + raise + finally: + pass + + def setUp(self): + super(RawDCERPCTest, self).setUp() + self.do_ndr_print = False + self.do_hexdump = False + + self.host = samba.tests.env_get_var_value('SERVER') + self.tcp_port = 135 + + self.settings = {} + self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm() + self.settings["target_hostname"] = self.host + + self.connect() + + def epmap_reconnect(self, abstract): + ndr32 = samba.dcerpc.base.transfer_syntax_ndr() + + tsf0_list = [ndr32] + ctx0 = samba.dcerpc.dcerpc.ctx_list() + ctx0.context_id = 1 + ctx0.num_transfer_syntaxes = len(tsf0_list) + ctx0.abstract_syntax = samba.dcerpc.epmapper.abstract_syntax() + ctx0.transfer_syntaxes = tsf0_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx0]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, + req.call_id, auth_length=0) + self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEqual(rep.u.secondary_address_size, 4) + self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEqual(len(rep.u._pad1), 2) + self.assertEqual(rep.u._pad1, '\0' * 2) + self.assertEqual(rep.u.num_results, 1) + self.assertEqual(rep.u.ctx_list[0].result, + samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEqual(rep.u.ctx_list[0].reason, + samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEqual(rep.u.auth_info, '\0' * 0) + + # And now try a request + data1 = samba.ndr.ndr_pack(abstract) + lhs1 = samba.dcerpc.epmapper.epm_lhs() + lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID + lhs1.lhs_data = data1[:18] + rhs1 = samba.dcerpc.epmapper.epm_rhs_uuid() + rhs1.unknown = data1[18:] + floor1 = samba.dcerpc.epmapper.epm_floor() + floor1.lhs = lhs1 + floor1.rhs = rhs1 + data2 = samba.ndr.ndr_pack(ndr32) + lhs2 = samba.dcerpc.epmapper.epm_lhs() + lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID + lhs2.lhs_data = data2[:18] + rhs2 = samba.dcerpc.epmapper.epm_rhs_uuid() + rhs2.unknown = data1[18:] + floor2 = samba.dcerpc.epmapper.epm_floor() + floor2.lhs = lhs2 + floor2.rhs = rhs2 + lhs3 = samba.dcerpc.epmapper.epm_lhs() + lhs3.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_NCACN + lhs3.lhs_data = "" + floor3 = samba.dcerpc.epmapper.epm_floor() + floor3.lhs = lhs3 + floor3.rhs.minor_version = 0 + lhs4 = samba.dcerpc.epmapper.epm_lhs() + lhs4.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_TCP + lhs4.lhs_data = "" + floor4 = samba.dcerpc.epmapper.epm_floor() + floor4.lhs = lhs4 + floor4.rhs.port = self.tcp_port + lhs5 = samba.dcerpc.epmapper.epm_lhs() + lhs5.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_IP + lhs5.lhs_data = "" + floor5 = samba.dcerpc.epmapper.epm_floor() + floor5.lhs = lhs5 + floor5.rhs.ipaddr = "0.0.0.0" + + floors = [floor1,floor2,floor3,floor4,floor5] + req_tower = samba.dcerpc.epmapper.epm_tower() + req_tower.num_floors = len(floors) + req_tower.floors = floors + req_twr = samba.dcerpc.epmapper.epm_twr_t() + req_twr.tower = req_tower + + pack_twr = samba.ndr.ndr_pack(req_twr) + + # object + stub = "\x01\x00\x00\x00" + stub += "\x00" * 16 + # tower + stub += "\x02\x00\x00\x00" + stub += pack_twr + # padding? + stub += "\x00" * 1 + # handle + stub += "\x00" * 20 + # max_towers + stub += "\x04\x00\x00\x00" + + # we do an epm_Map() request + req = self.generate_request(call_id = 1, + context_id=ctx0.context_id, + opnum=3, + stub=stub) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, + req.call_id, auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, req.u.context_id) + self.assertEqual(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + num_towers = struct.unpack_from(" samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH: + p.auth_length = len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH + else: + p.auth_length = 0 + p.call_id = call_id + p.u = payload + + pdu = samba.ndr.ndr_pack(p) + p.frag_length = len(pdu) + + return p + + def verify_pdu(self, p, ptype, call_id, + rpc_vers=5, + rpc_vers_minor=0, + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + drep = [samba.dcerpc.dcerpc.DCERPC_DREP_LE, 0, 0, 0], + auth_length=None): + + self.assertIsNotNone(p, "No valid pdu") + + if getattr(p.u, 'auth_info', None): + ai = p.u.auth_info + else: + ai = "" + + self.assertEqual(p.rpc_vers, rpc_vers) + self.assertEqual(p.rpc_vers_minor, rpc_vers_minor) + self.assertEqual(p.ptype, ptype) + self.assertEqual(p.pfc_flags, pfc_flags) + self.assertEqual(p.drep, drep) + self.assertGreaterEqual(p.frag_length, + samba.dcerpc.dcerpc.DCERPC_NCACN_PAYLOAD_OFFSET) + if len(ai) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH: + self.assertEqual(p.auth_length, + len(ai) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH) + elif auth_length is not None: + self.assertEqual(p.auth_length, auth_length) + else: + self.assertEqual(p.auth_length, 0) + self.assertEqual(p.call_id, call_id) + + return + + def generate_bind(self, call_id, + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + max_xmit_frag=5840, + max_recv_frag=5840, + assoc_group_id=0, + ctx_list=[], + auth_info="", + ndr_print=None, hexdump=None): + + b = samba.dcerpc.dcerpc.bind() + b.max_xmit_frag = max_xmit_frag + b.max_recv_frag = max_recv_frag + b.assoc_group_id = assoc_group_id + b.num_contexts = len(ctx_list) + b.ctx_list = ctx_list + b.auth_info = auth_info + + p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_BIND, + pfc_flags=pfc_flags, + call_id=call_id, + payload=b, + ndr_print=ndr_print, hexdump=hexdump) + + return p + + def generate_alter(self, call_id, + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + max_xmit_frag=5840, + max_recv_frag=5840, + assoc_group_id=0, + ctx_list=[], + auth_info="", + ndr_print=None, hexdump=None): + + a = samba.dcerpc.dcerpc.bind() + a.max_xmit_frag = max_xmit_frag + a.max_recv_frag = max_recv_frag + a.assoc_group_id = assoc_group_id + a.num_contexts = len(ctx_list) + a.ctx_list = ctx_list + a.auth_info = auth_info + + p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ALTER, + pfc_flags=pfc_flags, + call_id=call_id, + payload=a, + ndr_print=ndr_print, hexdump=hexdump) + + return p + + def generate_auth3(self, call_id, + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + auth_info="", + ndr_print=None, hexdump=None): + + a = samba.dcerpc.dcerpc.auth3() + a.auth_info = auth_info + + p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_AUTH3, + pfc_flags=pfc_flags, + call_id=call_id, + payload=a, + ndr_print=ndr_print, hexdump=hexdump) + + return p + + def generate_request(self, call_id, + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + alloc_hint=None, + context_id=None, + opnum=None, + object=None, + stub=None, + auth_info="", + ndr_print=None, hexdump=None): + + if alloc_hint is None: + alloc_hint = len(stub) + + r = samba.dcerpc.dcerpc.request() + r.alloc_hint = alloc_hint + r.context_id = context_id + r.opnum = opnum + if object is not None: + r.object = object + r.stub_and_verifier = stub + auth_info + + p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_REQUEST, + pfc_flags=pfc_flags, + call_id=call_id, + payload=r, + ndr_print=ndr_print, hexdump=hexdump) + + if len(auth_info) > samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH: + p.auth_length = len(auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH + + return p + + def generate_co_cancel(self, call_id, + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + auth_info="", + ndr_print=None, hexdump=None): + + c = samba.dcerpc.dcerpc.co_cancel() + c.auth_info = auth_info + + p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_CO_CANCEL, + pfc_flags=pfc_flags, + call_id=call_id, + payload=c, + ndr_print=ndr_print, hexdump=hexdump) + + return p + + def generate_orphaned(self, call_id, + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + auth_info="", + ndr_print=None, hexdump=None): + + o = samba.dcerpc.dcerpc.orphaned() + o.auth_info = auth_info + + p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_ORPHANED, + pfc_flags=pfc_flags, + call_id=call_id, + payload=o, + ndr_print=ndr_print, hexdump=hexdump) + + return p + + def generate_shutdown(self, call_id, + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST | + samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST, + ndr_print=None, hexdump=None): + + s = samba.dcerpc.dcerpc.shutdown() + + p = self.generate_pdu(ptype=samba.dcerpc.dcerpc.DCERPC_PKT_SHUTDOWN, + pfc_flags=pfc_flags, + call_id=call_id, + payload=s, + ndr_print=ndr_print, hexdump=hexdump) + + return p + + def assertIsConnected(self): + self.assertIsNotNone(self.s, msg="Not connected") + return + + def assertNotConnected(self): + self.assertIsNone(self.s, msg="Is connected") + return + + def assertNDRSyntaxEquals(self, s1, s2): + self.assertEqual(s1.uuid, s2.uuid) + self.assertEqual(s1.if_version, s2.if_version) + return class ValidNetbiosNameTests(TestCase):