]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_Interop.py
Merge pull request #13927 from rgacogne/fix-fclose-warnings
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_Interop.py
CommitLineData
962a980d
PL
1import dns
2import socket
3import copy
3a7ddb0b 4import os
962a980d
PL
5from recursortests import RecursorTest
6from twisted.internet.protocol import DatagramProtocol
7from twisted.internet import reactor
8import threading
9
10class testInterop(RecursorTest):
11 _confdir = 'Interop'
12
3a7ddb0b
PL
13 _config_template = """dnssec=validate
14packetcache-ttl=0 # explicitly disable packetcache
15forward-zones=undelegated.secure.example=%s.12
16forward-zones+=undelegated.insecure.example=%s.12
17 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
962a980d
PL
18
19 def testFORMERR(self):
20 """
21 #3841, when we encounter a server that does not understands OPT records
22 (or something else), we don't retry without EDNS in dnssec=validate mode
23 """
24 expected = dns.rrset.from_text('host1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.1')
25
26 query = dns.message.make_query('cname-to-formerr.secure.example.', 'A')
27 res = self.sendUDPQuery(query)
28
29 self.assertRcodeEqual(res, dns.rcode.NOERROR)
30 self.assertRRsetInAnswer(res, expected)
31
694ef440
PL
32 def testCNAMEWithLowerEntries(self):
33 """
34 #4158, When chasing down for DS/DNSKEY and we find a CNAME, skip a level
35 """
36 expected = dns.rrset.from_text('node1.insecure.sub2.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.18')
37
38 query = dns.message.make_query('node1.insecure.sub2.secure.example.', 'A')
39 query.flags |= dns.flags.AD
40 res = self.sendUDPQuery(query)
41
42 self.assertRcodeEqual(res, dns.rcode.NOERROR)
5b44ffbb 43 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
694ef440
PL
44 self.assertRRsetInAnswer(res, expected)
45
3a7ddb0b
PL
46 def testUndelegatedForwardedZoneExisting(self):
47 """
48 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that exists
49 """
50
51 query = dns.message.make_query('node1.undelegated.secure.example.', 'A')
52 query.flags |= dns.flags.AD
53
54 # twice, so we hit the record cache
55 self.sendUDPQuery(query)
56 res = self.sendUDPQuery(query)
57
58 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
5b44ffbb 59 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
3a7ddb0b
PL
60
61 def testUndelegatedForwardedZoneNXDOMAIN(self):
62 """
63 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that does not exist
64 """
65
66 query = dns.message.make_query('node2.undelegated.secure.example.', 'A')
67 query.flags |= dns.flags.AD
68
69 # twice, so we hit the negative record cache
70 self.sendUDPQuery(query)
71 res = self.sendUDPQuery(query)
72
73 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
5b44ffbb 74 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
3a7ddb0b
PL
75
76 def testUndelegatedForwardedInsecureZoneExisting(self):
77 """
78 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that exists
79 """
80
81 expected = dns.rrset.from_text('node1.undelegated.insecure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.22')
82 query = dns.message.make_query('node1.undelegated.insecure.example.', 'A')
83 query.flags |= dns.flags.AD
84
85 # twice, so we hit the record cache
86 self.sendUDPQuery(query)
87 res = self.sendUDPQuery(query)
88
89 self.assertRcodeEqual(res, dns.rcode.NOERROR)
5b44ffbb 90 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
3a7ddb0b
PL
91 self.assertRRsetInAnswer(res, expected)
92
93 def testUndelegatedForwardedInsecureZoneNXDOMAIN(self):
94 """
95 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that does not exist
96 """
97
98 query = dns.message.make_query('node2.undelegated.insecure.example.', 'A')
99 query.flags |= dns.flags.AD
100
101 # twice, so we hit the negative record cache
102 self.sendUDPQuery(query)
103 res = self.sendUDPQuery(query)
104
105 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
5b44ffbb 106 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
3a7ddb0b 107
d5b8ccd4
PL
108 def testBothSecureCNAMEAtApex(self):
109 """
110 #4466: a CNAME at the apex of a secure domain to another secure domain made us use the wrong DNSKEY to validate
111 """
112 query = dns.message.make_query('cname-secure.example.', 'A')
113 query.flags |= dns.flags.AD
114
115 res = self.sendUDPQuery(query)
116 expectedCNAME = dns.rrset.from_text('cname-secure.example.', 0, dns.rdataclass.IN, 'CNAME', 'secure.example.')
117 expectedA = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.17')
118
119 self.assertRRsetInAnswer(res, expectedA)
120 self.assertRRsetInAnswer(res, expectedCNAME)
121 self.assertRcodeEqual(res, dns.rcode.NOERROR)
5b44ffbb 122 self.assertMessageHasFlags(res, ['QR', 'RD', 'RA', 'AD'], [])
3a7ddb0b 123
90b85dd0
PD
124 def testNonApexDNSKEY(self):
125 """
126 a DNSKEY not at the apex of a zone should not be treated as a DNSKEY in validation
127 """
128 query = dns.message.make_query('non-apex-dnskey.secure.example.', 'DNSKEY')
129 query.flags |= dns.flags.AD
130
131 res = self.sendUDPQuery(query)
132 print(res)
133 expectedDNSKEY = dns.rrset.from_text('non-apex-dnskey.secure.example.', 0, dns.rdataclass.IN, 'DNSKEY', '257 3 13 CT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
134
135 self.assertRRsetInAnswer(res, expectedDNSKEY)
136 self.assertRcodeEqual(res, dns.rcode.NOERROR)
137 self.assertMessageHasFlags(res, ['QR', 'RD', 'RA', 'AD'], [])
138
139
962a980d
PL
140 @classmethod
141 def startResponders(cls):
142 print("Launching responders..")
143
144 address = cls._PREFIX + '.2'
145 port = 53
146
147 reactor.listenUDP(port, UDPResponder(), interface=address)
148
8a3a3822
RG
149 if not reactor.running:
150 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
151 cls._UDPResponder.setDaemon(True)
152 cls._UDPResponder.start()
962a980d 153
29ad8796
PD
154class testInteropProcess(RecursorTest):
155 _confdir = 'InteropProcess'
156
157 _config_template = """dnssec=process
158packetcache-ttl=0 # explicitly disable packetcache
159forward-zones=undelegated.secure.example=%s.12
160forward-zones+=undelegated.insecure.example=%s.12
161 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
162
163 def testNonApexDNSKEY2(self):
164 """
165 a DNSKEY not at the apex of a zone should not be treated as a DNSKEY in validation,
166 even when it was cached with Indeterminate validation state before
167 """
168
169 # send the query with +CD so the record ends up cached with Indeterminate validation state
170 query = dns.message.make_query('non-apex-dnskey2.secure.example.', 'DNSKEY')
171 query.flags |= dns.flags.CD
172
173 res = self.sendUDPQuery(query)
174 print(res)
175 expectedDNSKEY = dns.rrset.from_text('non-apex-dnskey2.secure.example.', 0, dns.rdataclass.IN, 'DNSKEY', '256 3 13 CT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
176
177 self.assertRRsetInAnswer(res, expectedDNSKEY)
178 self.assertRcodeEqual(res, dns.rcode.NOERROR)
179 self.assertMessageHasFlags(res, ['QR', 'RD', 'RA', 'CD'], [])
180
181 # now ask again with +AD so it has to be validated from cache
182 query = dns.message.make_query('non-apex-dnskey2.secure.example.', 'DNSKEY')
183 query.flags |= dns.flags.AD
184
185 res = self.sendUDPQuery(query)
186 print(res)
187 expectedDNSKEY = dns.rrset.from_text('non-apex-dnskey2.secure.example.', 0, dns.rdataclass.IN, 'DNSKEY', '256 3 13 CT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
188
189 self.assertRRsetInAnswer(res, expectedDNSKEY)
190 self.assertRcodeEqual(res, dns.rcode.NOERROR)
191 self.assertMessageHasFlags(res, ['QR', 'RD', 'RA', 'AD'], [])
192
193 def testNonApexDNSKEYANYQuery(self):
194 """
195 a DNSKEY not at the apex of a zone should not be treated as a DNSKEY in validation,
196 even when it was cached with Indeterminate validation state before.
197 This code tests the ANY path which is separate from the qtype=DNSKEY path tested in testNonApexDNSKEY2
198 """
199
200 # send the query with +CD so the record ends up cached with Indeterminate validation state
201 query = dns.message.make_query('non-apex-dnskey3.secure.example.', 'DNSKEY')
202 query.flags |= dns.flags.CD
203
204 res = self.sendUDPQuery(query)
205 print(res)
206 expectedDNSKEY = dns.rrset.from_text('non-apex-dnskey3.secure.example.', 0, dns.rdataclass.IN, 'DNSKEY', '256 3 13 DT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
207
208 self.assertRRsetInAnswer(res, expectedDNSKEY)
209 self.assertRcodeEqual(res, dns.rcode.NOERROR)
210 self.assertMessageHasFlags(res, ['QR', 'RD', 'RA', 'CD'], [])
211
212 # now ask again with +AD so it has to be validated from cache
213 query = dns.message.make_query('non-apex-dnskey3.secure.example.', 'ANY')
214 query.flags |= dns.flags.AD
215
216 res = self.sendUDPQuery(query)
217 print(res)
218 expectedDNSKEY = dns.rrset.from_text('non-apex-dnskey3.secure.example.', 0, dns.rdataclass.IN, 'DNSKEY', '256 3 13 DT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
219
220 self.assertRRsetInAnswer(res, expectedDNSKEY)
221 self.assertRcodeEqual(res, dns.rcode.NOERROR)
222 self.assertMessageHasFlags(res, ['QR', 'RD', 'RA', 'AD'], [])
223
962a980d
PL
224class UDPResponder(DatagramProtocol):
225 def datagramReceived(self, datagram, address):
226 request = dns.message.from_wire(datagram)
227
228 response = dns.message.make_response(request)
229 response.flags = dns.flags.AA + dns.flags.QR
230
231 if request.edns != -1:
232 response.set_rcode(dns.rcode.FORMERR)
3d144e24
PD
233 response.use_edns(edns=-1)
234
962a980d
PL
235 response.additional = []
236 else:
033c08c5
RG
237 if request.question[0].name == dns.name.from_text('host1.insecure-formerr.example.') and request.question[0].rdtype == dns.rdatatype.A:
238 answer = dns.rrset.from_text('host1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.1')
239 response.answer.append(answer)
240 elif request.question[0].name == dns.name.from_text('insecure-formerr.example.') and request.question[0].rdtype == dns.rdatatype.NS:
241 answer = dns.rrset.from_text('insecure-formerr.example.', 15, dns.rdataclass.IN, 'NS', 'ns1.insecure-formerr.example.')
242 response.answer.append(answer)
243 additional = dns.rrset.from_text('ns1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.2')
244 response.additional.append(additional)
962a980d
PL
245
246 self.transport.write(response.to_wire(), address)