From: Andréas Leroux Date: Fri, 13 Feb 2026 08:22:32 +0000 (+0100) Subject: s4/dns_server: add large dns udp truncated packets tests X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d1a309b4e6e7fa24d95e7cf7067ff43dcbb3a070;p=thirdparty%2Fsamba.git s4/dns_server: add large dns udp truncated packets tests Large DNS response must be truncated over UDP, though this is not yet done in samba. Test is added as knownfail until implementation BUG: https://bugzilla.samba.org/show_bug.cgi?id=15988 Signed-off-by: Andréas Leroux Reviewed-by: Douglas Bagnall Reviewed-by: Gary Lockyer --- diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py index a331e26209d..365dfdd6d13 100644 --- a/python/samba/tests/dns.py +++ b/python/samba/tests/dns.py @@ -22,6 +22,7 @@ from samba.auth import system_session import ldb from ldb import ERR_OPERATIONS_ERROR import os +import random import sys import struct import socket @@ -2244,4 +2245,126 @@ class TestRPCRoundtrip(DNSTest): add_rec_buf) +class TestUDPTruncation(DNSTest): + """ + Samba internal DNS (no EDNS0): if a UDP reply would exceed + DNS_MAX_UDP_PACKET_LENGTH (1232) bytes, server should return + a small (<512 bytes) truncated response containing only + the question (no answers), with TC set. + TCP must return the full response (no size limit). + """ + + def setUp(self): + super().setUp() + global server, server_ip, lp, creds, timeout + self.server = server_name + self.server_ip = server_ip + self.lp = lp + self.creds = creds + self.timeout = timeout + + self.rpc_conn = dnsserver.dnsserver( + "ncacn_ip_tcp:%s[sign]" % (self.server_ip), + self.lp, + self.creds + ) + + def _rpc_add_one_txt(self, fqdn, payload, ttl=900): + data = '"\\"%s\\""' % payload + rec = record_from_string(dnsp.DNS_TYPE_TXT, data) + rec.dwTtlSeconds = ttl + + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + + try: + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + fqdn, + add_rec_buf, + None + ) + except WERRORError as e: + self.fail(str(e)) + return add_rec_buf # keep for deletion + + def _rpc_del_one_txt(self, fqdn, rec_buf): + self.rpc_conn.DnssrvUpdateRecord2( + dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server_ip, + self.get_dns_domain(), + fqdn, + None, + rec_buf + ) + + def _query_txt_udp_raw(self, fqdn): + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + q = self.make_name_question(fqdn, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + self.finish_name_packet(p, [q]) + + (resp, raw) = self.dns_transaction_udp(p, host=self.server_ip) + return (p, resp, raw) + + def _query_txt_tcp_raw(self, fqdn): + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + q = self.make_name_question(fqdn, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + self.finish_name_packet(p, [q]) + (resp, raw) = self.dns_transaction_tcp(p, host=self.server_ip) + return (p, resp, raw) + + def test_udp_truncates_large_packets(self): + uniq = "%08x" % random.getrandbits(32) + fqdn = "trunc-%s.%s" % (uniq, self.get_dns_domain()) + + rec_bufs = [] + try: + # Create enough large TXT records to force oversized answers. + # Keep each TXT <=255 bytes. + for i in range(60): + payload = ("X" * 230) + ("%08d" % i) + rec_bufs.append(self._rpc_add_one_txt(fqdn, payload, ttl=900)) + + # UDP query should come back: <512 bytes, TC set, no answers + (_req, resp, raw) = self._query_txt_udp_raw(fqdn) + self.assertLess(len(raw), 512, + "Expected truncated response to be < 512 bytes") + + _id, flags = struct.unpack("!HH", raw[0:4]) + self.assertTrue((flags & dns.DNS_FLAG_TRUNCATION) != 0, + "Expected TC flag set in UDP response") + + self.assert_dns_rcode_equals(resp, dns.DNS_RCODE_OK) + self.assertEqual(resp.qdcount, 1) + self.assertEqual(resp.ancount, 0) + self.assertEqual(resp.nscount, 0) + self.assertEqual(resp.arcount, 0) + + # TCP query should return full answer (TC not set) + (_req2, resp2, raw2) = self._query_txt_tcp_raw(fqdn) + _id2, flags2 = struct.unpack("!HH", raw2[0:4]) + self.assertTrue((flags2 & dns.DNS_FLAG_TRUNCATION) == 0, + "Did not expect TC flag set in TCP response") + + self.assert_dns_rcode_equals(resp2, dns.DNS_RCODE_OK) + self.assertGreaterEqual(resp2.ancount, 20, + "Expected many TXT answers over TCP") + # If even the TCP packet is lower than our threshold, + # this test is meaningless + self.assertGreater(len(raw2), 1232, + "Expected TCP response to exceed 1232 bytes") + + finally: + # cleanup + for b in rec_bufs: + try: + self._rpc_del_one_txt(fqdn, b) + except Exception: + pass + + TestProgram(module=__name__, opts=subunitopts) diff --git a/selftest/knownfail.d/dns b/selftest/knownfail.d/dns index 94000e03baa..998c50cb320 100644 --- a/selftest/knownfail.d/dns +++ b/selftest/knownfail.d/dns @@ -58,6 +58,7 @@ samba.tests.dns.__main__.TestZones.test_basic_scavenging\(vampire_dc:local\) samba.tests.dns.__main__.TestZones.test_dns_tombstone_custom_match_rule\(vampire_dc:local\) samba.tests.dns.__main__.TestZones.test_dynamic_record_static_update\(vampire_dc:local\) samba.tests.dns.__main__.TestZones.test_static_record_dynamic_update\(vampire_dc:local\) +samba.tests.dns.__main__.TestUDPTruncation.test_udp_truncates_large_packets samba.tests.dns.__main__.TestComplexQueries.test_cname_two_chain\(vampire_dc:local\) samba.tests.dns.__main__.TestComplexQueries.test_one_a_query\(vampire_dc:local\)