res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
self.assertRRsetInAnswer(res, expected)
def testUndelegatedForwardedZoneExisting(self):
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
- self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
def testUndelegatedForwardedZoneNXDOMAIN(self):
"""
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
- self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
def testUndelegatedForwardedInsecureZoneExisting(self):
"""
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
self.assertRRsetInAnswer(res, expected)
def testUndelegatedForwardedInsecureZoneNXDOMAIN(self):
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
- self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
+ def testBothSecureCNAMEAtApex(self):
+ """
+ #4466: a CNAME at the apex of a secure domain to another secure domain made us use the wrong DNSKEY to validate
+ """
+ query = dns.message.make_query('cname-secure.example.', 'A')
+ query.flags |= dns.flags.AD
+
+ res = self.sendUDPQuery(query)
+ expectedCNAME = dns.rrset.from_text('cname-secure.example.', 0, dns.rdataclass.IN, 'CNAME', 'secure.example.')
+ expectedA = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.17')
+
+ self.assertRRsetInAnswer(res, expectedA)
+ self.assertRRsetInAnswer(res, expectedCNAME)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RD', 'RA', 'AD'], [])
@classmethod
def startResponders(cls):
reactor.listenUDP(port, UDPResponder(), interface=address)
- cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
- cls._UDPResponder.setDaemon(True)
- cls._UDPResponder.start()
-
- @classmethod
- def tearDownResponders(cls):
- reactor.stop()
+ if not reactor.running:
+ cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
+ cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.start()
class UDPResponder(DatagramProtocol):
def datagramReceived(self, datagram, address):
response.edns = -1
response.additional = []
else:
- answer = dns.rrset.from_text('host1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.1')
- response.answer.append(answer)
+ if request.question[0].name == dns.name.from_text('host1.insecure-formerr.example.') and request.question[0].rdtype == dns.rdatatype.A:
+ answer = dns.rrset.from_text('host1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.1')
+ response.answer.append(answer)
+ elif request.question[0].name == dns.name.from_text('insecure-formerr.example.') and request.question[0].rdtype == dns.rdatatype.NS:
+ answer = dns.rrset.from_text('insecure-formerr.example.', 15, dns.rdataclass.IN, 'NS', 'ns1.insecure-formerr.example.')
+ response.answer.append(answer)
+ additional = dns.rrset.from_text('ns1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.2')
+ response.additional.append(additional)
self.transport.write(response.to_wire(), address)