From 1ec4c8b97598aa70f36b1a01854e04b87bca2612 Mon Sep 17 00:00:00 2001 From: Otto Moerbeek Date: Tue, 21 Oct 2025 15:35:12 +0200 Subject: [PATCH] extend test to cover both incoming UDP and TCP Signed-off-by: Otto Moerbeek --- .../test_Protobuf.py | 82 +++++++++++-------- 1 file changed, 48 insertions(+), 34 deletions(-) diff --git a/regression-tests.recursor-dnssec/test_Protobuf.py b/regression-tests.recursor-dnssec/test_Protobuf.py index c7e652870b..d095c3e1be 100644 --- a/regression-tests.recursor-dnssec/test_Protobuf.py +++ b/regression-tests.recursor-dnssec/test_Protobuf.py @@ -381,47 +381,61 @@ logging: expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') query = dns.message.make_query(name, 'A', want_dnssec=True) query.flags |= dns.flags.CD - res = self.sendUDPQuery(query) + for method in ('sendUDPQuery', 'sendTCPQuery'): + sender = getattr(self, method) + res = sender(query) - self.assertRRsetInAnswer(res, expected) + self.assertRRsetInAnswer(res, expected) - # check the protobuf messages corresponding to the UDP query and answer - msg = self.getFirstProtobufMessage() - self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name) - # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version - self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - # we have max-cache-ttl set to 15 - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkNoRemainingMessage() + # check the protobuf messages corresponding to the query and answer + msg = self.getFirstProtobufMessage() + if method == 'sendUDPQuery': + protocol = dnsmessage_pb2.PBDNSMessage.UDP + else: + protocol = dnsmessage_pb2.PBDNSMessage.TCP + self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) + # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version + self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) + # then the response + msg = self.getFirstProtobufMessage() + self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') + self.assertEqual(len(msg.response.rrs), 1) + rr = msg.response.rrs[0] + # we have max-cache-ttl set to 15 + if method == 'sendUDPQuery': + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15) + self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') + self.checkNoRemainingMessage() # # again, for a PC cache hit # - res = self.sendUDPQuery(query) + for method in ('sendUDPQuery', 'sendTCPQuery'): + sender = getattr(self, method) + res = sender(query) - self.assertRRsetInAnswer(res, expected) + self.assertRRsetInAnswer(res, expected) - # check the protobuf messages corresponding to the UDP query and answer - msg = self.getFirstProtobufMessage() - self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name) - # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version - self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - # we have max-cache-ttl set to 15 - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, True, True) - self.checkProtobufEDE(msg, 0, '') - self.checkNoRemainingMessage() + # check the protobuf messages corresponding to the UDP query and answer + msg = self.getFirstProtobufMessage() + if method == 'sendUDPQuery': + protocol = dnsmessage_pb2.PBDNSMessage.UDP + else: + protocol = dnsmessage_pb2.PBDNSMessage.TCP + self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) + # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version + self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) + # then the response + msg = self.getFirstProtobufMessage() + self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') + self.assertEqual(len(msg.response.rrs), 1) + rr = msg.response.rrs[0] + # we have max-cache-ttl set to 15 + if method == 'sendUDPQuery': + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15) + self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') + self.checkProtobufOT(msg, True, True) + self.checkProtobufEDE(msg, 0, '') + self.checkNoRemainingMessage() def testCNAME(self): name = 'cname.example.' -- 2.47.3