self.assertEqual(msg.deviceName, deviceName)
def checkProtobufEDE(self, msg, ede, edeText):
- print(msg)
+ #print(msg)
self.assertTrue((ede == 0) == (not msg.HasField('ede')))
self.assertTrue((edeText == '') == (not msg.HasField('edeText')))
self.assertEqual(msg.ede, ede)
def generateRecursorConfig(cls, confdir):
super(ProtobufDefaultTest, cls).generateRecursorYamlConfig(confdir, False)
- def reloadConfig(self, config):
- confdir = os.path.join('configs', ProtobufDefaultTest._confdir)
- ProtobufDefaultTest._config_template = config
- ProtobufDefaultTest.generateRecursorYamlConfig(confdir, False)
- ProtobufDefaultTest.recControl(confdir, 'reload-yaml')
-
- config_default = """
-recursor:
- auth_zones:
- - zone: example
- file: configs/%s/example.zone
- event_trace_enabled: 4
-logging:
- protobuf_servers:
- - servers: [127.0.0.1:%s, 127.0.0.1:%s]
- opentelemetry_trace_conditions:
- - acls: ['0.0.0.0/0']
-""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port)
-
- def testADefault(self):
- self.reloadConfig(self.config_default)
- name = 'a.example.'
- expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
- query = dns.message.make_query(name, 'A', want_dnssec=True)
+ def runtest(self, name, dnstype, content, trace, traceid, edns=None, af=socket.AF_INET):
+ expected = None
+ if content is not None:
+ expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, dnstype, content)
+ query = dns.message.make_query(name, dnstype, want_dnssec=True, options=edns)
query.flags |= dns.flags.CD
for method in ('sendUDPQuery', 'sendTCPQuery'):
sender = getattr(self, method)
res = sender(query)
- self.assertRRsetInAnswer(res, expected)
+ if expected is not None:
+ self.assertRRsetInAnswer(res, expected)
# check the protobuf messages corresponding to the query and answer
msg = self.getFirstProtobufMessage()
protocol = dnsmessage_pb2.PBDNSMessage.UDP
else:
protocol = dnsmessage_pb2.PBDNSMessage.TCP
- self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+ self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dnstype, name)
# wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
# then the response
msg = self.getFirstProtobufMessage()
self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, True, True)
- self.checkProtobufEDE(msg, 0, '')
+ if content is not None:
+ self.assertEqual(len(msg.response.rrs), 1)
+ rr = msg.response.rrs[0]
+ self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dnstype, name, 15, checkTTL=False)
+ if af is not None:
+ self.assertEqual(socket.inet_ntop(af, rr.rdata), content)
+ self.checkProtobufOT(msg, trace, traceid)
self.checkNoRemainingMessage()
#
# again, for a PC cache hit
sender = getattr(self, method)
res = sender(query)
- self.assertRRsetInAnswer(res, expected)
+ if expected is not None:
+ self.assertRRsetInAnswer(res, expected)
# check the protobuf messages corresponding to the UDP query and answer
msg = self.getFirstProtobufMessage()
protocol = dnsmessage_pb2.PBDNSMessage.UDP
else:
protocol = dnsmessage_pb2.PBDNSMessage.TCP
- self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+ self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dnstype, name)
# wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
# then the response
msg = self.getFirstProtobufMessage()
self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, True, True)
- self.checkProtobufEDE(msg, 0, '')
+ if content is not None:
+ self.assertEqual(len(msg.response.rrs), 1)
+ rr = msg.response.rrs[0]
+ self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dnstype, name, 15, checkTTL=False)
+ if af is not None:
+ self.assertEqual(socket.inet_ntop(af, rr.rdata), content)
+ self.checkProtobufOT(msg, trace, traceid)
self.checkNoRemainingMessage()
+ def reloadConfig(self, config):
+ confdir = os.path.join('configs', ProtobufDefaultTest._confdir)
+ ProtobufDefaultTest._config_template = config
+ ProtobufDefaultTest.generateRecursorYamlConfig(confdir, False)
+ ProtobufDefaultTest.recControl(confdir, 'reload-yaml')
+
+ config_default = """
+recursor:
+ auth_zones:
+ - zone: example
+ file: configs/%s/example.zone
+ event_trace_enabled: 4
+logging:
+ protobuf_servers:
+ - servers: [127.0.0.1:%s, 127.0.0.1:%s]
+ opentelemetry_trace_conditions:
+ - acls: ['0.0.0.0/0']
+""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port)
+
+ def testADefault(self):
+ self.reloadConfig(self.config_default)
+ self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', True, True)
+
def testCNAMEDefault(self):
self.reloadConfig(self.config_default)
name = 'cname.example.'
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, 'a.example.', 15, checkTTL=False)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
self.checkProtobufOT(msg, True, True)
- self.checkProtobufEDE(msg, 0, '')
self.checkNoRemainingMessage()
def testATraceIDOnly(self):
self.reloadConfig(self.config_traceid_only)
- name = 'a.example.'
- expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
edns = self.getOpenTelemetryEDNS(b'012345678012345678')
- query = dns.message.make_query(name, 'A', want_dnssec=True, options=[edns])
- query.flags |= dns.flags.CD
- for method in ('sendUDPQuery', 'sendTCPQuery'):
- sender = getattr(self, method)
- res = sender(query)
-
- self.assertRRsetInAnswer(res, expected)
-
- # check the protobuf messages corresponding to the query and answer
- msg = self.getFirstProtobufMessage()
- if method == 'sendUDPQuery':
- protocol = dnsmessage_pb2.PBDNSMessage.UDP
- else:
- protocol = dnsmessage_pb2.PBDNSMessage.TCP
- self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
- self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, False, True)
- self.checkProtobufEDE(msg, 0, '')
- self.checkNoRemainingMessage()
- #
- # again, for a PC cache hit
- #
- for method in ('sendUDPQuery', 'sendTCPQuery'):
- sender = getattr(self, method)
- res = sender(query)
-
- self.assertRRsetInAnswer(res, expected)
-
- # check the protobuf messages corresponding to the UDP query and answer
- msg = self.getFirstProtobufMessage()
- if method == 'sendUDPQuery':
- protocol = dnsmessage_pb2.PBDNSMessage.UDP
- else:
- protocol = dnsmessage_pb2.PBDNSMessage.TCP
- self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
- self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, False, True)
- self.checkProtobufEDE(msg, 0, '')
- self.checkNoRemainingMessage()
+ self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', False, True, edns)
def testCNAMETraceIDOnly(self):
self.reloadConfig(self.config_traceid_only)
name = 'cname.example.'
expectedCNAME = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'CNAME', 'a.example.')
expectedA = dns.rrset.from_text('a.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.42')
- query = dns.message.make_query(name, 'A', want_dnssec=True)
+ edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+ query = dns.message.make_query(name, 'A', want_dnssec=True, options=[edns])
query.flags |= dns.flags.CD
raw = self.sendUDPQuery(query, decode=False)
res = dns.message.from_wire(raw)
rr = msg.response.rrs[1]
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, 'a.example.', 15, checkTTL=False)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, False, False)
- self.checkProtobufEDE(msg, 0, '')
+ self.checkProtobufOT(msg, False, True)
self.checkNoRemainingMessage()
config_otaonly = """
def testAOTAOnly(self):
self.reloadConfig(self.config_otaonly)
- name = 'a.example.'
- expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
- query = dns.message.make_query(name, 'A', want_dnssec=True)
- query.flags |= dns.flags.CD
- for method in ('sendUDPQuery', 'sendTCPQuery'):
- sender = getattr(self, method)
- res = sender(query)
-
- self.assertRRsetInAnswer(res, expected)
-
- # check the protobuf messages corresponding to the query and answer
- msg = self.getFirstProtobufMessage()
- if method == 'sendUDPQuery':
- protocol = dnsmessage_pb2.PBDNSMessage.UDP
- else:
- protocol = dnsmessage_pb2.PBDNSMessage.TCP
- self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
- self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, True, True)
- self.checkProtobufEDE(msg, 0, '')
- self.checkNoRemainingMessage()
- #
- # again, for a PC cache hit
- #
- for method in ('sendUDPQuery', 'sendTCPQuery'):
- sender = getattr(self, method)
- res = sender(query)
-
- self.assertRRsetInAnswer(res, expected)
-
- # check the protobuf messages corresponding to the UDP query and answer
- msg = self.getFirstProtobufMessage()
- if method == 'sendUDPQuery':
- protocol = dnsmessage_pb2.PBDNSMessage.UDP
- else:
- protocol = dnsmessage_pb2.PBDNSMessage.TCP
- self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
- self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, True, True)
- self.checkProtobufEDE(msg, 0, '')
- self.checkNoRemainingMessage()
+ self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', True, True)
def testAAAAOTAOnly(self):
self.reloadConfig(self.config_otaonly)
- name = 'aaaa.example.'
- expectedAAAA = dns.rrset.from_text('aaaa.example.', 0, dns.rdataclass.IN, 'AAAA', '2001:db8::2')
edns = self.getOpenTelemetryEDNS(b'012345678012345678')
- query = dns.message.make_query(name, 'AAAA', want_dnssec=True, options=[edns])
- query.flags |= dns.flags.CD
- raw = self.sendUDPQuery(query, decode=False)
- res = dns.message.from_wire(raw)
- self.assertRRsetInAnswer(res, expectedAAAA)
-
- # check the protobuf messages corresponding to the UDP query and answer
- # but first let the protobuf messages the time to get there
- msg = self.getFirstProtobufMessage()
- self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.AAAA, name)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1', receivedSize=len(raw))
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.AAAA, 'aaaa.example.', 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET6, rr.rdata), '2001:db8::2')
- print(msg)
- self.checkProtobufOT(msg, False, True)
- self.checkProtobufEDE(msg, 0, '')
- self.checkNoRemainingMessage()
+ self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', False, True, edns, socket.AF_INET6)
config_nameonly = """
recursor:
def testCorrectNameOnly(self):
self.reloadConfig(self.config_nameonly)
- name = 'a.example.'
- expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
- query = dns.message.make_query(name, 'A', want_dnssec=True)
- query.flags |= dns.flags.CD
- for method in ('sendUDPQuery', 'sendTCPQuery'):
- sender = getattr(self, method)
- res = sender(query)
+ edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+ self.runtest('a.example.', dns.rdatatype.A, '192.0.2.42', True, True, edns)
- self.assertRRsetInAnswer(res, expected)
+ def testOtherNameOnly(self):
+ self.reloadConfig(self.config_nameonly)
+ edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+ self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', False, True, edns, socket.AF_INET6)
- # check the protobuf messages corresponding to the query and answer
- msg = self.getFirstProtobufMessage()
- if method == 'sendUDPQuery':
- protocol = dnsmessage_pb2.PBDNSMessage.UDP
- else:
- protocol = dnsmessage_pb2.PBDNSMessage.TCP
- self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
- self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, True, True)
- self.checkProtobufEDE(msg, 0, '')
- self.checkNoRemainingMessage()
- #
- # again, for a PC cache hit
- #
- for method in ('sendUDPQuery', 'sendTCPQuery'):
- sender = getattr(self, method)
- res = sender(query)
+ config_nameandtypeonly = """
+recursor:
+ auth_zones:
+ - zone: example
+ file: configs/%s/example.zone
+ event_trace_enabled: 4
+logging:
+ protobuf_servers:
+ - servers: [127.0.0.1:%s, 127.0.0.1:%s]
+ opentelemetry_trace_conditions:
+ - acls: ['0.0.0.0/0']
+ traceid_only: false
+ qnames: ['aaaa.example']
+ qtypes: ['AAAA']
+""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port)
- self.assertRRsetInAnswer(res, expected)
+ def testCorrectNameAndTypeOnly(self):
+ self.reloadConfig(self.config_nameandtypeonly)
+ edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+ self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', True, True, edns, socket.AF_INET6)
- # check the protobuf messages corresponding to the UDP query and answer
- msg = self.getFirstProtobufMessage()
- if method == 'sendUDPQuery':
- protocol = dnsmessage_pb2.PBDNSMessage.UDP
- else:
- protocol = dnsmessage_pb2.PBDNSMessage.TCP
- self.checkProtobufQuery(msg, protocol, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- # wire format, RD and CD set in headerflags, plus DO bit in flags part of EDNS Version
- self.checkProtobufHeaderFlagsAndEDNSVersion(msg, 0x0110, 0x00008000)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, protocol, res, '127.0.0.1')
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufOT(msg, True, True)
- self.checkProtobufEDE(msg, 0, '')
- self.checkNoRemainingMessage()
+ def testCorrectNameWrongType(self):
+ self.reloadConfig(self.config_nameandtypeonly)
+ edns = self.getOpenTelemetryEDNS(b'012345678012345678')
+ self.runtest('aaaa.example.', dns.rdatatype.A, None, False, True, edns, None)
- def testOtherNameOnly(self):
- self.reloadConfig(self.config_nameonly)
- name = 'aaaa.example.'
- expectedAAAA = dns.rrset.from_text('aaaa.example.', 0, dns.rdataclass.IN, 'AAAA', '2001:db8::2')
+ config_nameandedns = """
+recursor:
+ auth_zones:
+ - zone: example
+ file: configs/%s/example.zone
+ event_trace_enabled: 4
+logging:
+ protobuf_servers:
+ - servers: [127.0.0.1:%s, 127.0.0.1:%s]
+ opentelemetry_trace_conditions:
+ - acls: ['0.0.0.0/0']
+ qnames: ['aaaa.example']
+ edns_option_required: true
+""" % (_confdir, protobufServersParameters[0].port, protobufServersParameters[1].port)
+
+ def testCorrectNameAndEDNSOnly(self):
+ self.reloadConfig(self.config_nameandedns)
edns = self.getOpenTelemetryEDNS(b'012345678012345678')
- query = dns.message.make_query(name, 'AAAA', want_dnssec=True, options=[edns])
- query.flags |= dns.flags.CD
- raw = self.sendUDPQuery(query, decode=False)
- res = dns.message.from_wire(raw)
- self.assertRRsetInAnswer(res, expectedAAAA)
+ self.runtest('aaaa.example.', dns.rdatatype.AAAA, '2001:db8::2', True, True, edns, socket.AF_INET6)
- # check the protobuf messages corresponding to the UDP query and answer
- # but first let the protobuf messages the time to get there
- msg = self.getFirstProtobufMessage()
- self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.AAAA, name)
- # then the response
- msg = self.getFirstProtobufMessage()
- self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res, '127.0.0.1', receivedSize=len(raw))
- self.assertEqual(len(msg.response.rrs), 1)
- rr = msg.response.rrs[0]
- self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.AAAA, 'aaaa.example.', 15, checkTTL=False)
- self.assertEqual(socket.inet_ntop(socket.AF_INET6, rr.rdata), '2001:db8::2')
- print(msg)
- self.checkProtobufOT(msg, False, True)
- self.checkProtobufEDE(msg, 0, '')
- self.checkNoRemainingMessage()
+ def testCorrectNameNoEDNS(self):
+ self.reloadConfig(self.config_nameandedns)
+ self.runtest('aaaa.example.', dns.rdatatype.A, None, False, False, None, None)
class ProtobufProxyMappingTest(TestRecursorProtobuf):
"""