]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
extend test to cover both incoming UDP and TCP
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Tue, 21 Oct 2025 13:35:12 +0000 (15:35 +0200)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Mon, 10 Nov 2025 14:20:44 +0000 (15:20 +0100)
Signed-off-by: Otto Moerbeek <otto.moerbeek@open-xchange.com>
regression-tests.recursor-dnssec/test_Protobuf.py

index c7e652870b6b26e2924d51d28524969940890a7c..d095c3e1be7144531a749da4759b5b23d8409764 100644 (file)
@@ -381,47 +381,61 @@ logging:
         expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
         query = dns.message.make_query(name, 'A', want_dnssec=True)
         query.flags |= dns.flags.CD
-        res = self.sendUDPQuery(query)
+        for method in ('sendUDPQuery', 'sendTCPQuery'):
+            sender = getattr(self, method)
+            res = sender(query)
 
-        self.assertRRsetInAnswer(res, expected)
+            self.assertRRsetInAnswer(res, expected)
 
-        # check the protobuf messages corresponding to the UDP query and answer
-        msg = self.getFirstProtobufMessage()
-        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
-        # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
-        self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
-        # then the response
-        msg = self.getFirstProtobufMessage()
-        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1')
-        self.assertEqual(len(msg.response.rrs), 1)
-        rr = msg.response.rrs[0]
-        # we have max-cache-ttl set to 15
-        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
-        self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-        self.checkNoRemainingMessage()
+            # check the protobuf messages corresponding to the query and answer
+            msg = self.getFirstProtobufMessage()
+            if method == 'sendUDPQuery':
+                protocol = dnsmessage_pb2.PBDNSMessage.UDP
+            else:
+                protocol = dnsmessage_pb2.PBDNSMessage.TCP
+            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+            # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
+            self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
+            # then the response
+            msg = self.getFirstProtobufMessage()
+            self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
+            self.assertEqual(len(msg.response.rrs), 1)
+            rr = msg.response.rrs[0]
+            # we have max-cache-ttl set to 15
+            if method == 'sendUDPQuery':
+                self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
+            self.checkNoRemainingMessage()
         #
         # again, for a PC cache hit
         #
-        res = self.sendUDPQuery(query)
+        for method in ('sendUDPQuery', 'sendTCPQuery'):
+            sender = getattr(self, method)
+            res = sender(query)
 
-        self.assertRRsetInAnswer(res, expected)
+            self.assertRRsetInAnswer(res, expected)
 
-        # check the protobuf messages corresponding to the UDP query and answer
-        msg = self.getFirstProtobufMessage()
-        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
-        # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
-        self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
-        # then the response
-        msg = self.getFirstProtobufMessage()
-        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1')
-        self.assertEqual(len(msg.response.rrs), 1)
-        rr = msg.response.rrs[0]
-        # we have max-cache-ttl set to 15
-        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
-        self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-        self.checkProtobufOT(msg, True, True)
-        self.checkProtobufEDE(msg, 0, '')
-        self.checkNoRemainingMessage()
+            # check the protobuf messages corresponding to the UDP query and answer
+            msg = self.getFirstProtobufMessage()
+            if method == 'sendUDPQuery':
+                protocol = dnsmessage_pb2.PBDNSMessage.UDP
+            else:
+                protocol = dnsmessage_pb2.PBDNSMessage.TCP
+            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+            # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
+            self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
+            # then the response
+            msg = self.getFirstProtobufMessage()
+            self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
+            self.assertEqual(len(msg.response.rrs), 1)
+            rr = msg.response.rrs[0]
+            # we have max-cache-ttl set to 15
+            if method == 'sendUDPQuery':
+                self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
+            self.checkProtobufOT(msg, True, True)
+            self.checkProtobufEDE(msg, 0, '')
+            self.checkNoRemainingMessage()
 
     def testCNAME(self):
         name = 'cname.example.'