From ed087bf50055ea474b508310deb28ef4a045d070 Mon Sep 17 00:00:00 2001 From: Otto Moerbeek Date: Wed, 12 Nov 2025 10:15:13 +0100 Subject: [PATCH] Refactor common code in tests and add a few tests Signed-off-by: Otto Moerbeek --- .../test_Protobuf.py | 364 +++++------------- 1 file changed, 105 insertions(+), 259 deletions(-) diff --git a/regression-tests.recursor-dnssec/test_Protobuf.py b/regression-tests.recursor-dnssec/test_Protobuf.py index 6397ff1c8a..3e27227c71 100644 --- a/regression-tests.recursor-dnssec/test_Protobuf.py +++ b/regression-tests.recursor-dnssec/test_Protobuf.py @@ -290,7 +290,7 @@ class TestRecursorProtobuf(RecursorTest): self.assertEqual(msg.deviceName, deviceName) def checkProtobufEDE(self, msg, ede, edeText): - print(msg) + #print(msg) self.assertTrue((ede == 0) == (not msg.HasField('ede'))) self.assertTrue((edeText == '') == (not msg.HasField('edeText'))) self.assertEqual(msg.ede, ede) @@ -383,36 +383,18 @@ logging: def generateRecursorConfig(cls, confdir): super(ProtobufDefaultTest, cls).generateRecursorYamlConfig(confdir, False) - def reloadConfig(self, config): - confdir = os.path.join('configs', ProtobufDefaultTest._confdir) - ProtobufDefaultTest._config_template = config - ProtobufDefaultTest.generateRecursorYamlConfig(confdir, False) - ProtobufDefaultTest.recControl(confdir, 'reload-yaml') - - config_default = """ -recursor: - auth_zones: - - zone: example - file: configs/%s/example.zone - event_trace_enabled: 4 -logging: - protobuf_servers: - - servers: [127.0.0.1:%s, 127.0.0.1:%s] - opentelemetry_trace_conditions: - - acls: ['0.0.0.0/0'] -""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port) - - def testADefault(self): - self.reloadConfig(self.config_default) - name = 'a.example.' - expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') - query = dns.message.make_query(name, 'A', want_dnssec=True) + def runtest(self, name, dnstype, content, trace, traceid, edns=None, af=socket.AF_INET): + expected = None + if content is not None: + expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, dnstype, content) + query = dns.message.make_query(name, dnstype, want_dnssec=True, options=edns) query.flags |= dns.flags.CD for method in ('sendUDPQuery', 'sendTCPQuery'): sender = getattr(self, method) res = sender(query) - self.assertRRsetInAnswer(res, expected) + if expected is not None: + self.assertRRsetInAnswer(res, expected) # check the protobuf messages corresponding to the query and answer msg = self.getFirstProtobufMessage() @@ -420,18 +402,19 @@ logging: protocol = dnsmessage_pb2.PBDNSMessage.UDP else: protocol = dnsmessage_pb2.PBDNSMessage.TCP - self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) + self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dnstype, name) # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) # then the response msg = self.getFirstProtobufMessage() self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, True, True) - self.checkProtobufEDE(msg, 0, '') + if content is not None: + self.assertEqual(len(msg.response.rrs), 1) + rr = msg.response.rrs[0] + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dnstype, name, 15, checkTTL=False) + if af is not None: + self.assertEqual(socket.inet_ntop(af, rr.rdata), content) + self.checkProtobufOT(msg, trace, traceid) self.checkNoRemainingMessage() # # again, for a PC cache hit @@ -440,7 +423,8 @@ logging: sender = getattr(self, method) res = sender(query) - self.assertRRsetInAnswer(res, expected) + if expected is not None: + self.assertRRsetInAnswer(res, expected) # check the protobuf messages corresponding to the UDP query and answer msg = self.getFirstProtobufMessage() @@ -448,20 +432,44 @@ logging: protocol = dnsmessage_pb2.PBDNSMessage.UDP else: protocol = dnsmessage_pb2.PBDNSMessage.TCP - self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) + self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dnstype, name) # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) # then the response msg = self.getFirstProtobufMessage() self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, True, True) - self.checkProtobufEDE(msg, 0, '') + if content is not None: + self.assertEqual(len(msg.response.rrs), 1) + rr = msg.response.rrs[0] + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dnstype, name, 15, checkTTL=False) + if af is not None: + self.assertEqual(socket.inet_ntop(af, rr.rdata), content) + self.checkProtobufOT(msg, trace, traceid) self.checkNoRemainingMessage() + def reloadConfig(self, config): + confdir = os.path.join('configs', ProtobufDefaultTest._confdir) + ProtobufDefaultTest._config_template = config + ProtobufDefaultTest.generateRecursorYamlConfig(confdir, False) + ProtobufDefaultTest.recControl(confdir, 'reload-yaml') + + config_default = """ +recursor: + auth_zones: + - zone: example + file: configs/%s/example.zone + event_trace_enabled: 4 +logging: + protobuf_servers: + - servers: [127.0.0.1:%s, 127.0.0.1:%s] + opentelemetry_trace_conditions: + - acls: ['0.0.0.0/0'] +""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port) + + def testADefault(self): + self.reloadConfig(self.config_default) + self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', True, True) + def testCNAMEDefault(self): self.reloadConfig(self.config_default) name = 'cname.example.' @@ -490,7 +498,6 @@ logging: self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, 'a.example.', 15, checkTTL=False) self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') self.checkProtobufOT(msg, True, True) - self.checkProtobufEDE(msg, 0, '') self.checkNoRemainingMessage() @@ -510,71 +517,16 @@ logging: def testATraceIDOnly(self): self.reloadConfig(self.config_traceid_only) - name = 'a.example.' - expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') edns = self.getOpenTelemetryEDNS(b'012345678012345678') - query = dns.message.make_query(name, 'A', want_dnssec=True, options=[edns]) - query.flags |= dns.flags.CD - for method in ('sendUDPQuery', 'sendTCPQuery'): - sender = getattr(self, method) - res = sender(query) - - self.assertRRsetInAnswer(res, expected) - - # check the protobuf messages corresponding to the query and answer - msg = self.getFirstProtobufMessage() - if method == 'sendUDPQuery': - protocol = dnsmessage_pb2.PBDNSMessage.UDP - else: - protocol = dnsmessage_pb2.PBDNSMessage.TCP - self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) - # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version - self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, False, True) - self.checkProtobufEDE(msg, 0, '') - self.checkNoRemainingMessage() - # - # again, for a PC cache hit - # - for method in ('sendUDPQuery', 'sendTCPQuery'): - sender = getattr(self, method) - res = sender(query) - - self.assertRRsetInAnswer(res, expected) - - # check the protobuf messages corresponding to the UDP query and answer - msg = self.getFirstProtobufMessage() - if method == 'sendUDPQuery': - protocol = dnsmessage_pb2.PBDNSMessage.UDP - else: - protocol = dnsmessage_pb2.PBDNSMessage.TCP - self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) - # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version - self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, False, True) - self.checkProtobufEDE(msg, 0, '') - self.checkNoRemainingMessage() + self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', False, True, edns) def testCNAMETraceIDOnly(self): self.reloadConfig(self.config_traceid_only) name = 'cname.example.' expectedCNAME = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'CNAME', 'a.example.') expectedA = dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.42') - query = dns.message.make_query(name, 'A', want_dnssec=True) + edns = self.getOpenTelemetryEDNS(b'012345678012345678') + query = dns.message.make_query(name, 'A', want_dnssec=True, options=[edns]) query.flags |= dns.flags.CD raw = self.sendUDPQuery(query, decode=False) res = dns.message.from_wire(raw) @@ -596,8 +548,7 @@ logging: rr = msg.response.rrs[1] self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, 'a.example.', 15, checkTTL=False) self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, False, False) - self.checkProtobufEDE(msg, 0, '') + self.checkProtobufOT(msg, False, True) self.checkNoRemainingMessage() config_otaonly = """ @@ -617,90 +568,12 @@ logging: def testAOTAOnly(self): self.reloadConfig(self.config_otaonly) - name = 'a.example.' - expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') - query = dns.message.make_query(name, 'A', want_dnssec=True) - query.flags |= dns.flags.CD - for method in ('sendUDPQuery', 'sendTCPQuery'): - sender = getattr(self, method) - res = sender(query) - - self.assertRRsetInAnswer(res, expected) - - # check the protobuf messages corresponding to the query and answer - msg = self.getFirstProtobufMessage() - if method == 'sendUDPQuery': - protocol = dnsmessage_pb2.PBDNSMessage.UDP - else: - protocol = dnsmessage_pb2.PBDNSMessage.TCP - self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) - # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version - self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, True, True) - self.checkProtobufEDE(msg, 0, '') - self.checkNoRemainingMessage() - # - # again, for a PC cache hit - # - for method in ('sendUDPQuery', 'sendTCPQuery'): - sender = getattr(self, method) - res = sender(query) - - self.assertRRsetInAnswer(res, expected) - - # check the protobuf messages corresponding to the UDP query and answer - msg = self.getFirstProtobufMessage() - if method == 'sendUDPQuery': - protocol = dnsmessage_pb2.PBDNSMessage.UDP - else: - protocol = dnsmessage_pb2.PBDNSMessage.TCP - self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) - # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version - self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, True, True) - self.checkProtobufEDE(msg, 0, '') - self.checkNoRemainingMessage() + self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', True, True) def testAAAAOTAOnly(self): self.reloadConfig(self.config_otaonly) - name = 'aaaa.example.' - expectedAAAA = dns.rrset.from_text('aaaa.example.', 0, dns.rdataclass.IN, 'AAAA', '2001:db8::2') edns = self.getOpenTelemetryEDNS(b'012345678012345678') - query = dns.message.make_query(name, 'AAAA', want_dnssec=True, options=[edns]) - query.flags |= dns.flags.CD - raw = self.sendUDPQuery(query, decode=False) - res = dns.message.from_wire(raw) - self.assertRRsetInAnswer(res, expectedAAAA) - - # check the protobuf messages corresponding to the UDP query and answer - # but first let the protobuf messages the time to get there - msg = self.getFirstProtobufMessage() - self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.AAAA, name) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1', receivedSize=len(raw)) - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.AAAA, 'aaaa.example.', 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET6, rr.rdata), '2001:db8::2') - print(msg) - self.checkProtobufOT(msg, False, True) - self.checkProtobufEDE(msg, 0, '') - self.checkNoRemainingMessage() + self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', False, True, edns, socket.AF_INET6) config_nameonly = """ recursor: @@ -719,90 +592,63 @@ logging: def testCorrectNameOnly(self): self.reloadConfig(self.config_nameonly) - name = 'a.example.' - expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42') - query = dns.message.make_query(name, 'A', want_dnssec=True) - query.flags |= dns.flags.CD - for method in ('sendUDPQuery', 'sendTCPQuery'): - sender = getattr(self, method) - res = sender(query) + edns = self.getOpenTelemetryEDNS(b'012345678012345678') + self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', True, True, edns) - self.assertRRsetInAnswer(res, expected) + def testOtherNameOnly(self): + self.reloadConfig(self.config_nameonly) + edns = self.getOpenTelemetryEDNS(b'012345678012345678') + self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', False, True, edns, socket.AF_INET6) - # check the protobuf messages corresponding to the query and answer - msg = self.getFirstProtobufMessage() - if method == 'sendUDPQuery': - protocol = dnsmessage_pb2.PBDNSMessage.UDP - else: - protocol = dnsmessage_pb2.PBDNSMessage.TCP - self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) - # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version - self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, True, True) - self.checkProtobufEDE(msg, 0, '') - self.checkNoRemainingMessage() - # - # again, for a PC cache hit - # - for method in ('sendUDPQuery', 'sendTCPQuery'): - sender = getattr(self, method) - res = sender(query) + config_nameandtypeonly = """ +recursor: + auth_zones: + - zone: example + file: configs/%s/example.zone + event_trace_enabled: 4 +logging: + protobuf_servers: + - servers: [127.0.0.1:%s, 127.0.0.1:%s] + opentelemetry_trace_conditions: + - acls: ['0.0.0.0/0'] + traceid_only: false + qnames: ['aaaa.example'] + qtypes: ['AAAA'] +""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port) - self.assertRRsetInAnswer(res, expected) + def testCorrectNameAndTypeOnly(self): + self.reloadConfig(self.config_nameandtypeonly) + edns = self.getOpenTelemetryEDNS(b'012345678012345678') + self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', True, True, edns, socket.AF_INET6) - # check the protobuf messages corresponding to the UDP query and answer - msg = self.getFirstProtobufMessage() - if method == 'sendUDPQuery': - protocol = dnsmessage_pb2.PBDNSMessage.UDP - else: - protocol = dnsmessage_pb2.PBDNSMessage.TCP - self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name) - # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version - self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, protocol, res, '127.0.0.1') - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42') - self.checkProtobufOT(msg, True, True) - self.checkProtobufEDE(msg, 0, '') - self.checkNoRemainingMessage() + def testCorrectNameWrongType(self): + self.reloadConfig(self.config_nameandtypeonly) + edns = self.getOpenTelemetryEDNS(b'012345678012345678') + self.runtest('aaaa.example.', dns.rdatatype.A, None, False, True, edns, None) - def testOtherNameOnly(self): - self.reloadConfig(self.config_nameonly) - name = 'aaaa.example.' - expectedAAAA = dns.rrset.from_text('aaaa.example.', 0, dns.rdataclass.IN, 'AAAA', '2001:db8::2') + config_nameandedns = """ +recursor: + auth_zones: + - zone: example + file: configs/%s/example.zone + event_trace_enabled: 4 +logging: + protobuf_servers: + - servers: [127.0.0.1:%s, 127.0.0.1:%s] + opentelemetry_trace_conditions: + - acls: ['0.0.0.0/0'] + qnames: ['aaaa.example'] + edns_option_required: true +""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port) + + def testCorrectNameAndEDNSOnly(self): + self.reloadConfig(self.config_nameandedns) edns = self.getOpenTelemetryEDNS(b'012345678012345678') - query = dns.message.make_query(name, 'AAAA', want_dnssec=True, options=[edns]) - query.flags |= dns.flags.CD - raw = self.sendUDPQuery(query, decode=False) - res = dns.message.from_wire(raw) - self.assertRRsetInAnswer(res, expectedAAAA) + self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', True, True, edns, socket.AF_INET6) - # check the protobuf messages corresponding to the UDP query and answer - # but first let the protobuf messages the time to get there - msg = self.getFirstProtobufMessage() - self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.AAAA, name) - # then the response - msg = self.getFirstProtobufMessage() - self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1', receivedSize=len(raw)) - self.assertEqual(len(msg.response.rrs), 1) - rr = msg.response.rrs[0] - self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.AAAA, 'aaaa.example.', 15, checkTTL=False) - self.assertEqual(socket.inet_ntop(socket.AF_INET6, rr.rdata), '2001:db8::2') - print(msg) - self.checkProtobufOT(msg, False, True) - self.checkProtobufEDE(msg, 0, '') - self.checkNoRemainingMessage() + def testCorrectNameNoEDNS(self): + self.reloadConfig(self.config_nameandedns) + self.runtest('aaaa.example.', dns.rdatatype.A, None, False, False, None, None) class ProtobufProxyMappingTest(TestRecursorProtobuf): """ -- 2.47.3