import ldb
from ldb import ERR_OPERATIONS_ERROR
import os
+import random
import sys
import struct
import socket
add_rec_buf)
+class TestUDPTruncation(DNSTest):
+ """
+ Samba internal DNS (no EDNS0): if a UDP reply would exceed
+ DNS_MAX_UDP_PACKET_LENGTH (1232) bytes, server should return
+ a small (<512 bytes) truncated response containing only
+ the question (no answers), with TC set.
+ TCP must return the full response (no size limit).
+ """
+
+ def setUp(self):
+ super().setUp()
+ global server, server_ip, lp, creds, timeout
+ self.server = server_name
+ self.server_ip = server_ip
+ self.lp = lp
+ self.creds = creds
+ self.timeout = timeout
+
+ self.rpc_conn = dnsserver.dnsserver(
+ "ncacn_ip_tcp:%s[sign]" % (self.server_ip),
+ self.lp,
+ self.creds
+ )
+
+ def _rpc_add_one_txt(self, fqdn, payload, ttl=900):
+ data = '"\\"%s\\""' % payload
+ rec = record_from_string(dnsp.DNS_TYPE_TXT, data)
+ rec.dwTtlSeconds = ttl
+
+ add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ add_rec_buf.rec = rec
+
+ try:
+ self.rpc_conn.DnssrvUpdateRecord2(
+ dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server_ip,
+ self.get_dns_domain(),
+ fqdn,
+ add_rec_buf,
+ None
+ )
+ except WERRORError as e:
+ self.fail(str(e))
+ return add_rec_buf # keep for deletion
+
+ def _rpc_del_one_txt(self, fqdn, rec_buf):
+ self.rpc_conn.DnssrvUpdateRecord2(
+ dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server_ip,
+ self.get_dns_domain(),
+ fqdn,
+ None,
+ rec_buf
+ )
+
+ def _query_txt_udp_raw(self, fqdn):
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ q = self.make_name_question(fqdn, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
+ self.finish_name_packet(p, [q])
+
+ (resp, raw) = self.dns_transaction_udp(p, host=self.server_ip)
+ return (p, resp, raw)
+
+ def _query_txt_tcp_raw(self, fqdn):
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ q = self.make_name_question(fqdn, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
+ self.finish_name_packet(p, [q])
+ (resp, raw) = self.dns_transaction_tcp(p, host=self.server_ip)
+ return (p, resp, raw)
+
+ def test_udp_truncates_large_packets(self):
+ uniq = "%08x" % random.getrandbits(32)
+ fqdn = "trunc-%s.%s" % (uniq, self.get_dns_domain())
+
+ rec_bufs = []
+ try:
+ # Create enough large TXT records to force oversized answers.
+ # Keep each TXT <=255 bytes.
+ for i in range(60):
+ payload = ("X" * 230) + ("%08d" % i)
+ rec_bufs.append(self._rpc_add_one_txt(fqdn, payload, ttl=900))
+
+ # UDP query should come back: <512 bytes, TC set, no answers
+ (_req, resp, raw) = self._query_txt_udp_raw(fqdn)
+ self.assertLess(len(raw), 512,
+ "Expected truncated response to be < 512 bytes")
+
+ _id, flags = struct.unpack("!HH", raw[0:4])
+ self.assertTrue((flags & dns.DNS_FLAG_TRUNCATION) != 0,
+ "Expected TC flag set in UDP response")
+
+ self.assert_dns_rcode_equals(resp, dns.DNS_RCODE_OK)
+ self.assertEqual(resp.qdcount, 1)
+ self.assertEqual(resp.ancount, 0)
+ self.assertEqual(resp.nscount, 0)
+ self.assertEqual(resp.arcount, 0)
+
+ # TCP query should return full answer (TC not set)
+ (_req2, resp2, raw2) = self._query_txt_tcp_raw(fqdn)
+ _id2, flags2 = struct.unpack("!HH", raw2[0:4])
+ self.assertTrue((flags2 & dns.DNS_FLAG_TRUNCATION) == 0,
+ "Did not expect TC flag set in TCP response")
+
+ self.assert_dns_rcode_equals(resp2, dns.DNS_RCODE_OK)
+ self.assertGreaterEqual(resp2.ancount, 20,
+ "Expected many TXT answers over TCP")
+ # If even the TCP packet is lower than our threshold,
+ # this test is meaningless
+ self.assertGreater(len(raw2), 1232,
+ "Expected TCP response to exceed 1232 bytes")
+
+ finally:
+ # cleanup
+ for b in rec_bufs:
+ try:
+ self._rpc_del_one_txt(fqdn, b)
+ except Exception:
+ pass
+
+
TestProgram(module=__name__, opts=subunitopts)