]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_Interop.py
Merge remote-tracking branch 'origin/master' into auth-ringbugs
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_Interop.py
CommitLineData
962a980d
PL
1import dns
2import socket
3import copy
3a7ddb0b 4import os
962a980d
PL
5from recursortests import RecursorTest
6from twisted.internet.protocol import DatagramProtocol
7from twisted.internet import reactor
8import threading
9
10class testInterop(RecursorTest):
11 _confdir = 'Interop'
12
3a7ddb0b
PL
13 _config_template = """dnssec=validate
14packetcache-ttl=0 # explicitly disable packetcache
15forward-zones=undelegated.secure.example=%s.12
16forward-zones+=undelegated.insecure.example=%s.12
17 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
962a980d
PL
18
19 def testFORMERR(self):
20 """
21 #3841, when we encounter a server that does not understands OPT records
22 (or something else), we don't retry without EDNS in dnssec=validate mode
23 """
24 expected = dns.rrset.from_text('host1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.1')
25
26 query = dns.message.make_query('cname-to-formerr.secure.example.', 'A')
27 res = self.sendUDPQuery(query)
28
29 self.assertRcodeEqual(res, dns.rcode.NOERROR)
30 self.assertRRsetInAnswer(res, expected)
31
694ef440
PL
32 def testCNAMEWithLowerEntries(self):
33 """
34 #4158, When chasing down for DS/DNSKEY and we find a CNAME, skip a level
35 """
36 expected = dns.rrset.from_text('node1.insecure.sub2.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.18')
37
38 query = dns.message.make_query('node1.insecure.sub2.secure.example.', 'A')
39 query.flags |= dns.flags.AD
40 res = self.sendUDPQuery(query)
41
42 self.assertRcodeEqual(res, dns.rcode.NOERROR)
5b44ffbb 43 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
694ef440
PL
44 self.assertRRsetInAnswer(res, expected)
45
3a7ddb0b
PL
46 def testUndelegatedForwardedZoneExisting(self):
47 """
48 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that exists
49 """
50
51 query = dns.message.make_query('node1.undelegated.secure.example.', 'A')
52 query.flags |= dns.flags.AD
53
54 # twice, so we hit the record cache
55 self.sendUDPQuery(query)
56 res = self.sendUDPQuery(query)
57
58 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
5b44ffbb 59 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
3a7ddb0b
PL
60
61 def testUndelegatedForwardedZoneNXDOMAIN(self):
62 """
63 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that does not exist
64 """
65
66 query = dns.message.make_query('node2.undelegated.secure.example.', 'A')
67 query.flags |= dns.flags.AD
68
69 # twice, so we hit the negative record cache
70 self.sendUDPQuery(query)
71 res = self.sendUDPQuery(query)
72
73 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
5b44ffbb 74 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
3a7ddb0b
PL
75
76 def testUndelegatedForwardedInsecureZoneExisting(self):
77 """
78 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that exists
79 """
80
81 expected = dns.rrset.from_text('node1.undelegated.insecure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.22')
82 query = dns.message.make_query('node1.undelegated.insecure.example.', 'A')
83 query.flags |= dns.flags.AD
84
85 # twice, so we hit the record cache
86 self.sendUDPQuery(query)
87 res = self.sendUDPQuery(query)
88
89 self.assertRcodeEqual(res, dns.rcode.NOERROR)
5b44ffbb 90 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
3a7ddb0b
PL
91 self.assertRRsetInAnswer(res, expected)
92
93 def testUndelegatedForwardedInsecureZoneNXDOMAIN(self):
94 """
95 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that does not exist
96 """
97
98 query = dns.message.make_query('node2.undelegated.insecure.example.', 'A')
99 query.flags |= dns.flags.AD
100
101 # twice, so we hit the negative record cache
102 self.sendUDPQuery(query)
103 res = self.sendUDPQuery(query)
104
105 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
5b44ffbb 106 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
3a7ddb0b 107
d5b8ccd4
PL
108 def testBothSecureCNAMEAtApex(self):
109 """
110 #4466: a CNAME at the apex of a secure domain to another secure domain made us use the wrong DNSKEY to validate
111 """
112 query = dns.message.make_query('cname-secure.example.', 'A')
113 query.flags |= dns.flags.AD
114
115 res = self.sendUDPQuery(query)
116 expectedCNAME = dns.rrset.from_text('cname-secure.example.', 0, dns.rdataclass.IN, 'CNAME', 'secure.example.')
117 expectedA = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.17')
118
119 self.assertRRsetInAnswer(res, expectedA)
120 self.assertRRsetInAnswer(res, expectedCNAME)
121 self.assertRcodeEqual(res, dns.rcode.NOERROR)
5b44ffbb 122 self.assertMessageHasFlags(res, ['QR', 'RD', 'RA', 'AD'], [])
3a7ddb0b 123
962a980d
PL
124 @classmethod
125 def startResponders(cls):
126 print("Launching responders..")
127
128 address = cls._PREFIX + '.2'
129 port = 53
130
131 reactor.listenUDP(port, UDPResponder(), interface=address)
132
8a3a3822
RG
133 if not reactor.running:
134 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
135 cls._UDPResponder.setDaemon(True)
136 cls._UDPResponder.start()
962a980d 137
962a980d
PL
138class UDPResponder(DatagramProtocol):
139 def datagramReceived(self, datagram, address):
140 request = dns.message.from_wire(datagram)
141
142 response = dns.message.make_response(request)
143 response.flags = dns.flags.AA + dns.flags.QR
144
145 if request.edns != -1:
146 response.set_rcode(dns.rcode.FORMERR)
147 response.edns = -1
148 response.additional = []
149 else:
033c08c5
RG
150 if request.question[0].name == dns.name.from_text('host1.insecure-formerr.example.') and request.question[0].rdtype == dns.rdatatype.A:
151 answer = dns.rrset.from_text('host1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.1')
152 response.answer.append(answer)
153 elif request.question[0].name == dns.name.from_text('insecure-formerr.example.') and request.question[0].rdtype == dns.rdatatype.NS:
154 answer = dns.rrset.from_text('insecure-formerr.example.', 15, dns.rdataclass.IN, 'NS', 'ns1.insecure-formerr.example.')
155 response.answer.append(answer)
156 additional = dns.rrset.from_text('ns1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.2')
157 response.additional.append(additional)
962a980d
PL
158
159 self.transport.write(response.to_wire(), address)