]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
tests/dcerpc/raw_protocol: add more test for auth padding during ALTER_CONTEXT/AUTH3
authorStefan Metzmacher <metze@samba.org>
Tue, 24 Sep 2024 07:56:05 +0000 (09:56 +0200)
committerAndreas Schneider <asn@cryptomilk.org>
Thu, 10 Oct 2024 14:01:04 +0000 (14:01 +0000)
The aim is to keep testing the code paths, which are no longer
testing because allow_bind_auth_pad is false now, which
means the existing tests fail directly at the BIND,
but we also want to test the error handling on
ALTER_CONTEXT (and AUTH3).

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14356

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andreas Schneider <asn@samba.org>
python/samba/tests/dcerpc/raw_protocol.py

index d5cd10d724ab9272b053df5e24e4d72ee0644d05..d10f767371b98401ea8ec368c57edbce80911b77 100755 (executable)
@@ -5170,7 +5170,7 @@ class TestDCERPC_BIND(RawDCERPCTest):
         self.assertIsNone(rep)
         self.assertNotConnected()
 
-    def test_spnego_auth_pad_ok(self):
+    def test_spnego_auth_pad_ok_bind_legacy(self):
         ndr32 = base.transfer_syntax_ndr()
 
         tsf1_list = [ndr32]
@@ -5335,7 +5335,171 @@ class TestDCERPC_BIND(RawDCERPCTest):
         self._disconnect("disconnect")
         self.assertNotConnected()
 
-    def test_spnego_auth_pad_fail_bind(self):
+    def test_spnego_auth_pad_ok_alter_legacy(self):
+        ndr32 = base.transfer_syntax_ndr()
+
+        tsf1_list = [ndr32]
+        ctx1 = dcerpc.ctx_list()
+        ctx1.context_id = 1
+        ctx1.num_transfer_syntaxes = len(tsf1_list)
+        ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+        ctx1.transfer_syntaxes = tsf1_list
+        ctx_list = [ctx1]
+
+        c = self.get_anon_creds()
+        g = gensec.Security.start_client(self.settings)
+        g.set_credentials(c)
+        g.want_feature(gensec.FEATURE_DCE_STYLE)
+        auth_type = dcerpc.DCERPC_AUTH_TYPE_SPNEGO
+        auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT
+        auth_context_id = 2
+        g.start_mech_by_authtype(auth_type, auth_level)
+        from_server = b""
+        (finished, to_server) = g.update(from_server)
+        self.assertFalse(finished)
+
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_blob=to_server)
+
+        req = self.generate_bind(call_id=0,
+                                 ctx_list=ctx_list,
+                                 auth_info=auth_info)
+        req_pdu = samba.ndr.ndr_pack(req)
+
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_pad_length=0,
+                                       auth_blob=to_server)
+
+        req = self.generate_bind(call_id=0,
+                                 ctx_list=ctx_list,
+                                 auth_info=auth_info)
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id)
+        self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+        self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
+        self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
+        self.assertEqual(rep.u.secondary_address_size, 4)
+        self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port)
+        self.assertPadding(rep.u._pad1, 2)
+        self.assertEqual(rep.u.num_results, 1)
+        self.assertEqual(rep.u.ctx_list[0].result,
+                          dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+        self.assertEqual(rep.u.ctx_list[0].reason,
+                          dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+        self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+        self.assertNotEqual(len(rep.u.auth_info), 0)
+        a = self.parse_auth(rep.u.auth_info)
+
+        from_server = a.credentials
+        (finished, to_server) = g.update(from_server)
+        self.assertFalse(finished)
+
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_blob=to_server)
+        req = self.generate_alter(call_id=0,
+                                  ctx_list=ctx_list,
+                                  assoc_group_id=rep.u.assoc_group_id,
+                                  auth_info=auth_info)
+        req_pdu = samba.ndr.ndr_pack(req)
+
+        auth_pad_ok = len(req_pdu)
+        auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
+        auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+        auth_pad_ok -= len(to_server)
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_pad_length=auth_pad_ok,
+                                       auth_blob=to_server)
+        req = self.generate_alter(call_id=0,
+                                  ctx_list=ctx_list,
+                                  assoc_group_id=rep.u.assoc_group_id,
+                                  auth_info=auth_info)
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        if not self.allow_bind_auth_pad:
+            # modern server (e.g. 2022)
+            self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id,
+                            pfc_flags=req.pfc_flags |
+                            dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
+                            auth_length=0)
+            self.assertNotEqual(rep.u.alloc_hint, 0)
+            self.assertEqual(rep.u.context_id, 0)
+            self.assertEqual(rep.u.cancel_count, 0)
+            self.assertEqual(rep.u.flags, 0)
+            self.assertEqual(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR)
+            self.assertEqual(rep.u.reserved, 0)
+            self.assertEqual(len(rep.u.error_and_verifier), 0)
+
+            # wait for a disconnect
+            rep = self.recv_pdu()
+            self.assertIsNone(rep)
+            self.assertNotConnected()
+            return
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id)
+        self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+        self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
+        self.assertEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
+        self.assertEqual(rep.u.secondary_address_size, 0)
+        self.assertPadding(rep.u._pad1, 2)
+        self.assertEqual(rep.u.num_results, 1)
+        self.assertEqual(rep.u.ctx_list[0].result,
+                          dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+        self.assertEqual(rep.u.ctx_list[0].reason,
+                          dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+        self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+        self.assertNotEqual(len(rep.u.auth_info), 0)
+        a = self.parse_auth(rep.u.auth_info)
+
+        from_server = a.credentials
+        (finished, to_server) = g.update(from_server)
+        self.assertTrue(finished)
+
+        # And now try a request without auth_info
+        req = self.generate_request(call_id=2,
+                                    context_id=ctx1.context_id,
+                                    opnum=0,
+                                    stub=b"")
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+                        auth_length=0)
+        self.assertNotEqual(rep.u.alloc_hint, 0)
+        self.assertEqual(rep.u.context_id, req.u.context_id)
+        self.assertEqual(rep.u.cancel_count, 0)
+        self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+        # Now a request with auth_info DCERPC_AUTH_LEVEL_CONNECT
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_blob=b"\x01" + b"\x00" * 15)
+        req = self.generate_request(call_id=3,
+                                    context_id=ctx1.context_id,
+                                    opnum=0,
+                                    stub=b"",
+                                    auth_info=auth_info)
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        # We don't get an auth_info back
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+                        auth_length=0)
+        self.assertNotEqual(rep.u.alloc_hint, 0)
+        self.assertEqual(rep.u.context_id, req.u.context_id)
+        self.assertEqual(rep.u.cancel_count, 0)
+        self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+        self._disconnect("disconnect")
+        self.assertNotConnected()
+
+    def test_spnego_auth_pad_fail_bind_legacy(self):
         ndr32 = base.transfer_syntax_ndr()
 
         tsf1_list = [ndr32]
@@ -5526,7 +5690,7 @@ class TestDCERPC_BIND(RawDCERPCTest):
         self.assertIsNone(rep)
         self.assertNotConnected()
 
-    def test_ntlmssp_auth_pad_ok(self):
+    def test_ntlmssp_auth_pad_ok_auth3(self):
         ndr32 = base.transfer_syntax_ndr()
 
         tsf1_list = [ndr32]
@@ -5693,6 +5857,208 @@ class TestDCERPC_BIND(RawDCERPCTest):
                                  auth_info=auth_info)
         req_pdu = samba.ndr.ndr_pack(req)
 
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_pad_length=0,
+                                       auth_blob=to_server)
+
+        req = self.generate_bind(call_id=0,
+                                 ctx_list=ctx_list,
+                                 auth_info=auth_info)
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id)
+        self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+        self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
+        self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
+        self.assertEqual(rep.u.secondary_address_size, 4)
+        self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port)
+        self.assertPadding(rep.u._pad1, 2)
+        self.assertEqual(rep.u.num_results, 1)
+        self.assertEqual(rep.u.ctx_list[0].result,
+                          dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+        self.assertEqual(rep.u.ctx_list[0].reason,
+                          dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+        self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+        self.assertNotEqual(len(rep.u.auth_info), 0)
+        a = self.parse_auth(rep.u.auth_info)
+
+        from_server = a.credentials
+        (finished, to_server) = g.update(from_server)
+        self.assertTrue(finished)
+
+        auth_pad_bad = 1
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_pad_length=auth_pad_bad,
+                                       auth_blob=to_server)
+        req = self.generate_auth3(call_id=0,
+                                  auth_info=auth_info)
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id,
+                        pfc_flags=req.pfc_flags |
+                        dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
+                        auth_length=0)
+        self.assertNotEqual(rep.u.alloc_hint, 0)
+        self.assertEqual(rep.u.context_id, 0)
+        self.assertEqual(rep.u.cancel_count, 0)
+        self.assertEqual(rep.u.flags, 0)
+        self.assertEqual(rep.u.status, dcerpc.DCERPC_NCA_S_FAULT_REMOTE_NO_MEMORY)
+        self.assertEqual(rep.u.reserved, 0)
+        self.assertEqual(len(rep.u.error_and_verifier), 0)
+
+        # wait for a disconnect
+        rep = self.recv_pdu()
+        self.assertIsNone(rep)
+        self.assertNotConnected()
+
+    def test_ntlmssp_auth_pad_fail_bind_legacy(self):
+        ndr32 = base.transfer_syntax_ndr()
+
+        tsf1_list = [ndr32]
+        ctx1 = dcerpc.ctx_list()
+        ctx1.context_id = 1
+        ctx1.num_transfer_syntaxes = len(tsf1_list)
+        ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+        ctx1.transfer_syntaxes = tsf1_list
+        ctx_list = [ctx1]
+
+        c = self.get_anon_creds()
+        g = gensec.Security.start_client(self.settings)
+        g.set_credentials(c)
+        g.want_feature(gensec.FEATURE_DCE_STYLE)
+        auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP
+        auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT
+        auth_context_id = 2
+        g.start_mech_by_authtype(auth_type, auth_level)
+        from_server = b""
+        (finished, to_server) = g.update(from_server)
+        self.assertFalse(finished)
+
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_blob=to_server)
+
+        req = self.generate_bind(call_id=0,
+                                 ctx_list=ctx_list,
+                                 auth_info=auth_info)
+        req_pdu = samba.ndr.ndr_pack(req)
+
+        auth_pad_ok = len(req_pdu)
+        auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
+        auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+        auth_pad_ok -= len(to_server)
+
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_pad_length=auth_pad_ok,
+                                       auth_blob=to_server)
+
+        req = self.generate_bind(call_id=0,
+                                 ctx_list=ctx_list,
+                                 auth_info=auth_info)
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        if not self.allow_bind_auth_pad:
+            # modern server (e.g. 2022)
+            self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
+                            auth_length=0)
+            self.assertEqual(rep.u.reject_reason,
+                              dcerpc.DCERPC_BIND_NAK_REASON_PROTOCOL_VERSION_NOT_SUPPORTED)
+            self.assertEqual(rep.u.num_versions, 1)
+            self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers)
+            self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
+            self.assertPadding(rep.u._pad, 3)
+            # wait for a disconnect
+            rep = self.recv_pdu()
+            self.assertIsNone(rep)
+            self.assertNotConnected()
+            return
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id)
+        self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+        self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
+        self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
+        self.assertEqual(rep.u.secondary_address_size, 4)
+        self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port)
+        self.assertPadding(rep.u._pad1, 2)
+        self.assertEqual(rep.u.num_results, 1)
+        self.assertEqual(rep.u.ctx_list[0].result,
+                          dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+        self.assertEqual(rep.u.ctx_list[0].reason,
+                          dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+        self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+        self.assertNotEqual(len(rep.u.auth_info), 0)
+        a = self.parse_auth(rep.u.auth_info)
+
+        from_server = a.credentials
+        (finished, to_server) = g.update(from_server)
+        self.assertTrue(finished)
+
+        auth_pad_bad = 1
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_pad_length=auth_pad_bad,
+                                       auth_blob=to_server)
+        req = self.generate_auth3(call_id=0,
+                                  auth_info=auth_info)
+        self.send_pdu(req)
+        rep = self.recv_pdu()
+        self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id,
+                        pfc_flags=req.pfc_flags |
+                        dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
+                        auth_length=0)
+        self.assertNotEqual(rep.u.alloc_hint, 0)
+        self.assertEqual(rep.u.context_id, 0)
+        self.assertEqual(rep.u.cancel_count, 0)
+        self.assertEqual(rep.u.flags, 0)
+        self.assertEqual(rep.u.status, dcerpc.DCERPC_NCA_S_FAULT_REMOTE_NO_MEMORY)
+        self.assertEqual(rep.u.reserved, 0)
+        self.assertEqual(len(rep.u.error_and_verifier), 0)
+
+        # wait for a disconnect
+        rep = self.recv_pdu()
+        self.assertIsNone(rep)
+        self.assertNotConnected()
+
+    def test_ntlmssp_auth_pad_fail_auth3_lagacy(self):
+        ndr32 = base.transfer_syntax_ndr()
+
+        tsf1_list = [ndr32]
+        ctx1 = dcerpc.ctx_list()
+        ctx1.context_id = 1
+        ctx1.num_transfer_syntaxes = len(tsf1_list)
+        ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+        ctx1.transfer_syntaxes = tsf1_list
+        ctx_list = [ctx1]
+
+        c = self.get_anon_creds()
+        g = gensec.Security.start_client(self.settings)
+        g.set_credentials(c)
+        g.want_feature(gensec.FEATURE_DCE_STYLE)
+        auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP
+        auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT
+        auth_context_id = 2
+        g.start_mech_by_authtype(auth_type, auth_level)
+        from_server = b""
+        (finished, to_server) = g.update(from_server)
+        self.assertFalse(finished)
+
+        auth_info = self.generate_auth(auth_type=auth_type,
+                                       auth_level=auth_level,
+                                       auth_context_id=auth_context_id,
+                                       auth_blob=to_server)
+
+        req = self.generate_bind(call_id=0,
+                                 ctx_list=ctx_list,
+                                 auth_info=auth_info)
+        req_pdu = samba.ndr.ndr_pack(req)
+
         auth_pad_ok = len(req_pdu)
         auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
         auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH