]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Refactor common code in tests and add a few tests 16362/head
authorOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 12 Nov 2025 09:15:13 +0000 (10:15 +0100)
committerOtto Moerbeek <otto.moerbeek@open-xchange.com>
Wed, 12 Nov 2025 09:47:02 +0000 (10:47 +0100)
Signed-off-by: Otto Moerbeek <otto.moerbeek@open-xchange.com>
regression-tests.recursor-dnssec/test_Protobuf.py

index 6397ff1c8ae62c521dc7945aea752e69494d3fe8..3e27227c7172655d251ba88c75e1ce1216c4d494 100644 (file)
@@ -290,7 +290,7 @@ class TestRecursorProtobuf(RecursorTest):
         self.assertEqual(msg.deviceName, deviceName)
 
     def checkProtobufEDE(self, msg, ede, edeText):
-        print(msg)
+        #print(msg)
         self.assertTrue((ede == 0) == (not msg.HasField('ede')))
         self.assertTrue((edeText == '') == (not msg.HasField('edeText')))
         self.assertEqual(msg.ede, ede)
@@ -383,36 +383,18 @@ logging:
     def generateRecursorConfig(cls, confdir):
         super(ProtobufDefaultTest, cls).generateRecursorYamlConfig(confdir, False)
 
-    def reloadConfig(self, config):
-      confdir = os.path.join('configs', ProtobufDefaultTest._confdir)
-      ProtobufDefaultTest._config_template = config
-      ProtobufDefaultTest.generateRecursorYamlConfig(confdir, False)
-      ProtobufDefaultTest.recControl(confdir, 'reload-yaml')
-
-    config_default = """
-recursor:
-    auth_zones:
-    - zone: example
-      file: configs/%s/example.zone
-    event_trace_enabled: 4
-logging:
-  protobuf_servers:
-    - servers: [127.0.0.1:%s, 127.0.0.1:%s]
-  opentelemetry_trace_conditions:
-    - acls: ['0.0.0.0/0']
-""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port)
-
-    def testADefault(self):
-        self.reloadConfig(self.config_default)
-        name = 'a.example.'
-        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
-        query = dns.message.make_query(name, 'A', want_dnssec=True)
+    def runtest(self, name, dnstype, content, trace, traceid, edns=None, af=socket.AF_INET):
+        expected = None
+        if content is not None:
+            expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, dnstype, content)
+        query = dns.message.make_query(name, dnstype, want_dnssec=True, options=edns)
         query.flags |= dns.flags.CD
         for method in ('sendUDPQuery', 'sendTCPQuery'):
             sender = getattr(self, method)
             res = sender(query)
 
-            self.assertRRsetInAnswer(res, expected)
+            if expected is not None:
+                self.assertRRsetInAnswer(res, expected)
 
             # check the protobuf messages corresponding to the query and answer
             msg = self.getFirstProtobufMessage()
@@ -420,18 +402,19 @@ logging:
                 protocol = dnsmessage_pb2.PBDNSMessage.UDP
             else:
                 protocol = dnsmessage_pb2.PBDNSMessage.TCP
-            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dnstype, name)
             # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
             self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
             # then the response
             msg = self.getFirstProtobufMessage()
             self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
-            self.assertEqual(len(msg.response.rrs), 1)
-            rr = msg.response.rrs[0]
-            self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
-            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-            self.checkProtobufOT(msg, True, True)
-            self.checkProtobufEDE(msg, 0, '')
+            if content is not None:
+                self.assertEqual(len(msg.response.rrs), 1)
+                rr = msg.response.rrs[0]
+                self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dnstype, name, 15, checkTTL=False)
+                if af is not None:
+                    self.assertEqual(socket.inet_ntop(af, rr.rdata), content)
+            self.checkProtobufOT(msg, trace, traceid)
             self.checkNoRemainingMessage()
         #
         # again, for a PC cache hit
@@ -440,7 +423,8 @@ logging:
             sender = getattr(self, method)
             res = sender(query)
 
-            self.assertRRsetInAnswer(res, expected)
+            if expected is not None:
+                self.assertRRsetInAnswer(res, expected)
 
             # check the protobuf messages corresponding to the UDP query and answer
             msg = self.getFirstProtobufMessage()
@@ -448,20 +432,44 @@ logging:
                 protocol = dnsmessage_pb2.PBDNSMessage.UDP
             else:
                 protocol = dnsmessage_pb2.PBDNSMessage.TCP
-            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dnstype, name)
             # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
             self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
             # then the response
             msg = self.getFirstProtobufMessage()
             self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
-            self.assertEqual(len(msg.response.rrs), 1)
-            rr = msg.response.rrs[0]
-            self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
-            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-            self.checkProtobufOT(msg, True, True)
-            self.checkProtobufEDE(msg, 0, '')
+            if content is not None:
+                self.assertEqual(len(msg.response.rrs), 1)
+                rr = msg.response.rrs[0]
+                self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dnstype, name, 15, checkTTL=False)
+                if af is not None:
+                    self.assertEqual(socket.inet_ntop(af, rr.rdata), content)
+            self.checkProtobufOT(msg, trace, traceid)
             self.checkNoRemainingMessage()
 
+    def reloadConfig(self, config):
+      confdir = os.path.join('configs', ProtobufDefaultTest._confdir)
+      ProtobufDefaultTest._config_template = config
+      ProtobufDefaultTest.generateRecursorYamlConfig(confdir, False)
+      ProtobufDefaultTest.recControl(confdir, 'reload-yaml')
+
+    config_default = """
+recursor:
+    auth_zones:
+    - zone: example
+      file: configs/%s/example.zone
+    event_trace_enabled: 4
+logging:
+  protobuf_servers:
+    - servers: [127.0.0.1:%s, 127.0.0.1:%s]
+  opentelemetry_trace_conditions:
+    - acls: ['0.0.0.0/0']
+""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port)
+
+    def testADefault(self):
+        self.reloadConfig(self.config_default)
+        self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', True, True)
+
     def testCNAMEDefault(self):
         self.reloadConfig(self.config_default)
         name = 'cname.example.'
@@ -490,7 +498,6 @@ logging:
         self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, 'a.example.', 15, checkTTL=False)
         self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
         self.checkProtobufOT(msg, True, True)
-        self.checkProtobufEDE(msg, 0, '')
         self.checkNoRemainingMessage()
 
 
@@ -510,71 +517,16 @@ logging:
 
     def testATraceIDOnly(self):
         self.reloadConfig(self.config_traceid_only)
-        name = 'a.example.'
-        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
         edns = self.getOpenTelemetryEDNS(b'012345678012345678')
-        query = dns.message.make_query(name, 'A', want_dnssec=True, options=[edns])
-        query.flags |= dns.flags.CD
-        for method in ('sendUDPQuery', 'sendTCPQuery'):
-            sender = getattr(self, method)
-            res = sender(query)
-
-            self.assertRRsetInAnswer(res, expected)
-
-            # check the protobuf messages corresponding to the query and answer
-            msg = self.getFirstProtobufMessage()
-            if method == 'sendUDPQuery':
-                protocol = dnsmessage_pb2.PBDNSMessage.UDP
-            else:
-                protocol = dnsmessage_pb2.PBDNSMessage.TCP
-            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
-            # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
-            self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
-            # then the response
-            msg = self.getFirstProtobufMessage()
-            self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
-            self.assertEqual(len(msg.response.rrs), 1)
-            rr = msg.response.rrs[0]
-            self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
-            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-            self.checkProtobufOT(msg, False, True)
-            self.checkProtobufEDE(msg, 0, '')
-            self.checkNoRemainingMessage()
-        #
-        # again, for a PC cache hit
-        #
-        for method in ('sendUDPQuery', 'sendTCPQuery'):
-            sender = getattr(self, method)
-            res = sender(query)
-
-            self.assertRRsetInAnswer(res, expected)
-
-            # check the protobuf messages corresponding to the UDP query and answer
-            msg = self.getFirstProtobufMessage()
-            if method == 'sendUDPQuery':
-                protocol = dnsmessage_pb2.PBDNSMessage.UDP
-            else:
-                protocol = dnsmessage_pb2.PBDNSMessage.TCP
-            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
-            # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
-            self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
-            # then the response
-            msg = self.getFirstProtobufMessage()
-            self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
-            self.assertEqual(len(msg.response.rrs), 1)
-            rr = msg.response.rrs[0]
-            self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
-            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-            self.checkProtobufOT(msg, False, True)
-            self.checkProtobufEDE(msg, 0, '')
-            self.checkNoRemainingMessage()
+        self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', False, True, edns)
 
     def testCNAMETraceIDOnly(self):
         self.reloadConfig(self.config_traceid_only)
         name = 'cname.example.'
         expectedCNAME = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'CNAME', 'a.example.')
         expectedA = dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.42')
-        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+        query = dns.message.make_query(name, 'A', want_dnssec=True, options=[edns])
         query.flags |= dns.flags.CD
         raw = self.sendUDPQuery(query, decode=False)
         res = dns.message.from_wire(raw)
@@ -596,8 +548,7 @@ logging:
         rr = msg.response.rrs[1]
         self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, 'a.example.', 15, checkTTL=False)
         self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-        self.checkProtobufOT(msg, False, False)
-        self.checkProtobufEDE(msg, 0, '')
+        self.checkProtobufOT(msg, False, True)
         self.checkNoRemainingMessage()
 
     config_otaonly = """
@@ -617,90 +568,12 @@ logging:
 
     def testAOTAOnly(self):
         self.reloadConfig(self.config_otaonly)
-        name = 'a.example.'
-        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
-        query = dns.message.make_query(name, 'A', want_dnssec=True)
-        query.flags |= dns.flags.CD
-        for method in ('sendUDPQuery', 'sendTCPQuery'):
-            sender = getattr(self, method)
-            res = sender(query)
-
-            self.assertRRsetInAnswer(res, expected)
-
-            # check the protobuf messages corresponding to the query and answer
-            msg = self.getFirstProtobufMessage()
-            if method == 'sendUDPQuery':
-                protocol = dnsmessage_pb2.PBDNSMessage.UDP
-            else:
-                protocol = dnsmessage_pb2.PBDNSMessage.TCP
-            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
-            # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
-            self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
-            # then the response
-            msg = self.getFirstProtobufMessage()
-            self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
-            self.assertEqual(len(msg.response.rrs), 1)
-            rr = msg.response.rrs[0]
-            self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
-            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-            self.checkProtobufOT(msg, True, True)
-            self.checkProtobufEDE(msg, 0, '')
-            self.checkNoRemainingMessage()
-        #
-        # again, for a PC cache hit
-        #
-        for method in ('sendUDPQuery', 'sendTCPQuery'):
-            sender = getattr(self, method)
-            res = sender(query)
-
-            self.assertRRsetInAnswer(res, expected)
-
-            # check the protobuf messages corresponding to the UDP query and answer
-            msg = self.getFirstProtobufMessage()
-            if method == 'sendUDPQuery':
-                protocol = dnsmessage_pb2.PBDNSMessage.UDP
-            else:
-                protocol = dnsmessage_pb2.PBDNSMessage.TCP
-            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
-            # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
-            self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
-            # then the response
-            msg = self.getFirstProtobufMessage()
-            self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
-            self.assertEqual(len(msg.response.rrs), 1)
-            rr = msg.response.rrs[0]
-            self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
-            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-            self.checkProtobufOT(msg, True, True)
-            self.checkProtobufEDE(msg, 0, '')
-            self.checkNoRemainingMessage()
+        self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', True, True)
 
     def testAAAAOTAOnly(self):
         self.reloadConfig(self.config_otaonly)
-        name = 'aaaa.example.'
-        expectedAAAA = dns.rrset.from_text('aaaa.example.', 0, dns.rdataclass.IN, 'AAAA', '2001:db8::2')
         edns = self.getOpenTelemetryEDNS(b'012345678012345678')
-        query = dns.message.make_query(name, 'AAAA', want_dnssec=True, options=[edns])
-        query.flags |= dns.flags.CD
-        raw = self.sendUDPQuery(query, decode=False)
-        res = dns.message.from_wire(raw)
-        self.assertRRsetInAnswer(res, expectedAAAA)
-
-        # check the protobuf messages corresponding to the UDP query and answer
-        # but first let the protobuf messages the time to get there
-        msg = self.getFirstProtobufMessage()
-        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.AAAA, name)
-        # then the response
-        msg = self.getFirstProtobufMessage()
-        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1', receivedSize=len(raw))
-        self.assertEqual(len(msg.response.rrs), 1)
-        rr = msg.response.rrs[0]
-        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.AAAA, 'aaaa.example.', 15, checkTTL=False)
-        self.assertEqual(socket.inet_ntop(socket.AF_INET6, rr.rdata), '2001:db8::2')
-        print(msg)
-        self.checkProtobufOT(msg, False, True)
-        self.checkProtobufEDE(msg, 0, '')
-        self.checkNoRemainingMessage()
+        self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', False, True, edns, socket.AF_INET6)
 
     config_nameonly = """
 recursor:
@@ -719,90 +592,63 @@ logging:
 
     def testCorrectNameOnly(self):
         self.reloadConfig(self.config_nameonly)
-        name = 'a.example.'
-        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
-        query = dns.message.make_query(name, 'A', want_dnssec=True)
-        query.flags |= dns.flags.CD
-        for method in ('sendUDPQuery', 'sendTCPQuery'):
-            sender = getattr(self, method)
-            res = sender(query)
+        edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+        self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', True, True, edns)
 
-            self.assertRRsetInAnswer(res, expected)
+    def testOtherNameOnly(self):
+        self.reloadConfig(self.config_nameonly)
+        edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+        self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', False, True, edns, socket.AF_INET6)
 
-            # check the protobuf messages corresponding to the query and answer
-            msg = self.getFirstProtobufMessage()
-            if method == 'sendUDPQuery':
-                protocol = dnsmessage_pb2.PBDNSMessage.UDP
-            else:
-                protocol = dnsmessage_pb2.PBDNSMessage.TCP
-            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
-            # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
-            self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
-            # then the response
-            msg = self.getFirstProtobufMessage()
-            self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
-            self.assertEqual(len(msg.response.rrs), 1)
-            rr = msg.response.rrs[0]
-            self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
-            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-            self.checkProtobufOT(msg, True, True)
-            self.checkProtobufEDE(msg, 0, '')
-            self.checkNoRemainingMessage()
-        #
-        # again, for a PC cache hit
-        #
-        for method in ('sendUDPQuery', 'sendTCPQuery'):
-            sender = getattr(self, method)
-            res = sender(query)
+    config_nameandtypeonly = """
+recursor:
+    auth_zones:
+    - zone: example
+      file: configs/%s/example.zone
+    event_trace_enabled: 4
+logging:
+  protobuf_servers:
+    - servers: [127.0.0.1:%s, 127.0.0.1:%s]
+  opentelemetry_trace_conditions:
+    - acls: ['0.0.0.0/0']
+      traceid_only: false
+      qnames: ['aaaa.example']
+      qtypes: ['AAAA']
+""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port)
 
-            self.assertRRsetInAnswer(res, expected)
+    def testCorrectNameAndTypeOnly(self):
+        self.reloadConfig(self.config_nameandtypeonly)
+        edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+        self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', True, True, edns, socket.AF_INET6)
 
-            # check the protobuf messages corresponding to the UDP query and answer
-            msg = self.getFirstProtobufMessage()
-            if method == 'sendUDPQuery':
-                protocol = dnsmessage_pb2.PBDNSMessage.UDP
-            else:
-                protocol = dnsmessage_pb2.PBDNSMessage.TCP
-            self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
-            # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
-            self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
-            # then the response
-            msg = self.getFirstProtobufMessage()
-            self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
-            self.assertEqual(len(msg.response.rrs), 1)
-            rr = msg.response.rrs[0]
-            self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
-            self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
-            self.checkProtobufOT(msg, True, True)
-            self.checkProtobufEDE(msg, 0, '')
-            self.checkNoRemainingMessage()
+    def testCorrectNameWrongType(self):
+        self.reloadConfig(self.config_nameandtypeonly)
+        edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+        self.runtest('aaaa.example.', dns.rdatatype.A, None, False, True, edns, None)
 
-    def testOtherNameOnly(self):
-        self.reloadConfig(self.config_nameonly)
-        name = 'aaaa.example.'
-        expectedAAAA = dns.rrset.from_text('aaaa.example.', 0, dns.rdataclass.IN, 'AAAA', '2001:db8::2')
+    config_nameandedns = """
+recursor:
+    auth_zones:
+    - zone: example
+      file: configs/%s/example.zone
+    event_trace_enabled: 4
+logging:
+  protobuf_servers:
+    - servers: [127.0.0.1:%s, 127.0.0.1:%s]
+  opentelemetry_trace_conditions:
+    - acls: ['0.0.0.0/0']
+      qnames: ['aaaa.example']
+      edns_option_required: true
+""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port)
+
+    def testCorrectNameAndEDNSOnly(self):
+        self.reloadConfig(self.config_nameandedns)
         edns = self.getOpenTelemetryEDNS(b'012345678012345678')
-        query = dns.message.make_query(name, 'AAAA', want_dnssec=True, options=[edns])
-        query.flags |= dns.flags.CD
-        raw = self.sendUDPQuery(query, decode=False)
-        res = dns.message.from_wire(raw)
-        self.assertRRsetInAnswer(res, expectedAAAA)
+        self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', True, True, edns, socket.AF_INET6)
 
-        # check the protobuf messages corresponding to the UDP query and answer
-        # but first let the protobuf messages the time to get there
-        msg = self.getFirstProtobufMessage()
-        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.AAAA, name)
-        # then the response
-        msg = self.getFirstProtobufMessage()
-        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1', receivedSize=len(raw))
-        self.assertEqual(len(msg.response.rrs), 1)
-        rr = msg.response.rrs[0]
-        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.AAAA, 'aaaa.example.', 15, checkTTL=False)
-        self.assertEqual(socket.inet_ntop(socket.AF_INET6, rr.rdata), '2001:db8::2')
-        print(msg)
-        self.checkProtobufOT(msg, False, True)
-        self.checkProtobufEDE(msg, 0, '')
-        self.checkNoRemainingMessage()
+    def testCorrectNameNoEDNS(self):
+        self.reloadConfig(self.config_nameandedns)
+        self.runtest('aaaa.example.', dns.rdatatype.A, None, False, False, None, None)
 
 class ProtobufProxyMappingTest(TestRecursorProtobuf):
     """