]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
s4/dns_server: add large dns udp truncated packets tests
authorAndréas Leroux <aleroux@tranquil.it>
Fri, 13 Feb 2026 08:22:32 +0000 (09:22 +0100)
committerJennifer Sutton <jsutton@samba.org>
Wed, 1 Apr 2026 04:05:39 +0000 (04:05 +0000)
Large DNS response must be truncated over UDP, though this is not yet done in samba. Test is added as knownfail until implementation

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15988

Signed-off-by: Andréas Leroux <aleroux@tranquil.it>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Gary Lockyer <gary@catalyst.net.nz>
python/samba/tests/dns.py
selftest/knownfail.d/dns

index a331e26209d496110ae21b325077db7427c16e9c..365dfdd6d13be34fdc9070f53fae402f5595a170 100644 (file)
@@ -22,6 +22,7 @@ from samba.auth import system_session
 import ldb
 from ldb import ERR_OPERATIONS_ERROR
 import os
+import random
 import sys
 import struct
 import socket
@@ -2244,4 +2245,126 @@ class TestRPCRoundtrip(DNSTest):
                 add_rec_buf)
 
 
+class TestUDPTruncation(DNSTest):
+    """
+    Samba internal DNS (no EDNS0): if a UDP reply would exceed
+    DNS_MAX_UDP_PACKET_LENGTH (1232) bytes, server should return
+    a small (<512 bytes) truncated response containing only
+    the question (no answers), with TC set.
+    TCP must return the full response (no size limit).
+    """
+
+    def setUp(self):
+        super().setUp()
+        global server, server_ip, lp, creds, timeout
+        self.server = server_name
+        self.server_ip = server_ip
+        self.lp = lp
+        self.creds = creds
+        self.timeout = timeout
+
+        self.rpc_conn = dnsserver.dnsserver(
+            "ncacn_ip_tcp:%s[sign]" % (self.server_ip),
+            self.lp,
+            self.creds
+        )
+
+    def _rpc_add_one_txt(self, fqdn, payload, ttl=900):
+        data = '"\\"%s\\""' % payload
+        rec = record_from_string(dnsp.DNS_TYPE_TXT, data)
+        rec.dwTtlSeconds = ttl
+
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(
+                dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                0,
+                self.server_ip,
+                self.get_dns_domain(),
+                fqdn,
+                add_rec_buf,
+                None
+            )
+        except WERRORError as e:
+            self.fail(str(e))
+        return add_rec_buf  # keep for deletion
+
+    def _rpc_del_one_txt(self, fqdn, rec_buf):
+        self.rpc_conn.DnssrvUpdateRecord2(
+            dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+            0,
+            self.server_ip,
+            self.get_dns_domain(),
+            fqdn,
+            None,
+            rec_buf
+        )
+
+    def _query_txt_udp_raw(self, fqdn):
+        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+        q = self.make_name_question(fqdn, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
+        self.finish_name_packet(p, [q])
+
+        (resp, raw) = self.dns_transaction_udp(p, host=self.server_ip)
+        return (p, resp, raw)
+
+    def _query_txt_tcp_raw(self, fqdn):
+        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+        q = self.make_name_question(fqdn, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
+        self.finish_name_packet(p, [q])
+        (resp, raw) = self.dns_transaction_tcp(p, host=self.server_ip)
+        return (p, resp, raw)
+
+    def test_udp_truncates_large_packets(self):
+        uniq = "%08x" % random.getrandbits(32)
+        fqdn = "trunc-%s.%s" % (uniq, self.get_dns_domain())
+
+        rec_bufs = []
+        try:
+            # Create enough large TXT records to force oversized answers.
+            # Keep each TXT <=255 bytes.
+            for i in range(60):
+                payload = ("X" * 230) + ("%08d" % i)
+                rec_bufs.append(self._rpc_add_one_txt(fqdn, payload, ttl=900))
+
+            # UDP query should come back: <512 bytes, TC set, no answers
+            (_req, resp, raw) = self._query_txt_udp_raw(fqdn)
+            self.assertLess(len(raw), 512,
+                            "Expected truncated response to be < 512 bytes")
+
+            _id, flags = struct.unpack("!HH", raw[0:4])
+            self.assertTrue((flags & dns.DNS_FLAG_TRUNCATION) != 0,
+                            "Expected TC flag set in UDP response")
+
+            self.assert_dns_rcode_equals(resp, dns.DNS_RCODE_OK)
+            self.assertEqual(resp.qdcount, 1)
+            self.assertEqual(resp.ancount, 0)
+            self.assertEqual(resp.nscount, 0)
+            self.assertEqual(resp.arcount, 0)
+
+            # TCP query should return full answer (TC not set)
+            (_req2, resp2, raw2) = self._query_txt_tcp_raw(fqdn)
+            _id2, flags2 = struct.unpack("!HH", raw2[0:4])
+            self.assertTrue((flags2 & dns.DNS_FLAG_TRUNCATION) == 0,
+                            "Did not expect TC flag set in TCP response")
+
+            self.assert_dns_rcode_equals(resp2, dns.DNS_RCODE_OK)
+            self.assertGreaterEqual(resp2.ancount, 20,
+                                    "Expected many TXT answers over TCP")
+            # If even the TCP packet is lower than our threshold,
+            # this test is meaningless
+            self.assertGreater(len(raw2), 1232,
+                               "Expected TCP response to exceed 1232 bytes")
+
+        finally:
+            # cleanup
+            for b in rec_bufs:
+                try:
+                    self._rpc_del_one_txt(fqdn, b)
+                except Exception:
+                    pass
+
+
 TestProgram(module=__name__, opts=subunitopts)
index 94000e03baa642e1be6125a39f7db96d55ebc060..998c50cb32039b4c00a8d52c9288b61416fe666b 100644 (file)
@@ -58,6 +58,7 @@ samba.tests.dns.__main__.TestZones.test_basic_scavenging\(vampire_dc:local\)
 samba.tests.dns.__main__.TestZones.test_dns_tombstone_custom_match_rule\(vampire_dc:local\)
 samba.tests.dns.__main__.TestZones.test_dynamic_record_static_update\(vampire_dc:local\)
 samba.tests.dns.__main__.TestZones.test_static_record_dynamic_update\(vampire_dc:local\)
+samba.tests.dns.__main__.TestUDPTruncation.test_udp_truncates_large_packets
 
 samba.tests.dns.__main__.TestComplexQueries.test_cname_two_chain\(vampire_dc:local\)
 samba.tests.dns.__main__.TestComplexQueries.test_one_a_query\(vampire_dc:local\)