]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_Interop.py
5 from recursortests
import RecursorTest
6 from twisted
.internet
.protocol
import DatagramProtocol
7 from twisted
.internet
import reactor
10 class testInterop(RecursorTest
):
13 _config_template
= """dnssec=validate
14 packetcache-ttl=0 # explicitly disable packetcache
15 forward-zones=undelegated.secure.example=%s.12
16 forward-zones+=undelegated.insecure.example=%s.12
17 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
19 def testFORMERR(self
):
21 #3841, when we encounter a server that does not understands OPT records
22 (or something else), we don't retry without EDNS in dnssec=validate mode
24 expected
= dns
.rrset
.from_text('host1.insecure-formerr.example.', 15, dns
.rdataclass
.IN
, 'A', '127.0.0.1')
26 query
= dns
.message
.make_query('cname-to-formerr.secure.example.', 'A')
27 res
= self
.sendUDPQuery(query
)
29 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
30 self
.assertRRsetInAnswer(res
, expected
)
32 def testCNAMEWithLowerEntries(self
):
34 #4158, When chasing down for DS/DNSKEY and we find a CNAME, skip a level
36 expected
= dns
.rrset
.from_text('node1.insecure.sub2.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '192.0.2.18')
38 query
= dns
.message
.make_query('node1.insecure.sub2.secure.example.', 'A')
39 query
.flags |
= dns
.flags
.AD
40 res
= self
.sendUDPQuery(query
)
42 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
43 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
44 self
.assertRRsetInAnswer(res
, expected
)
46 def testUndelegatedForwardedZoneExisting(self
):
48 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that exists
51 query
= dns
.message
.make_query('node1.undelegated.secure.example.', 'A')
52 query
.flags |
= dns
.flags
.AD
54 # twice, so we hit the record cache
55 self
.sendUDPQuery(query
)
56 res
= self
.sendUDPQuery(query
)
58 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
59 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
61 def testUndelegatedForwardedZoneNXDOMAIN(self
):
63 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that does not exist
66 query
= dns
.message
.make_query('node2.undelegated.secure.example.', 'A')
67 query
.flags |
= dns
.flags
.AD
69 # twice, so we hit the negative record cache
70 self
.sendUDPQuery(query
)
71 res
= self
.sendUDPQuery(query
)
73 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
74 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
76 def testUndelegatedForwardedInsecureZoneExisting(self
):
78 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that exists
81 expected
= dns
.rrset
.from_text('node1.undelegated.insecure.example.', 0, dns
.rdataclass
.IN
, 'A', '192.0.2.22')
82 query
= dns
.message
.make_query('node1.undelegated.insecure.example.', 'A')
83 query
.flags |
= dns
.flags
.AD
85 # twice, so we hit the record cache
86 self
.sendUDPQuery(query
)
87 res
= self
.sendUDPQuery(query
)
89 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
90 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
91 self
.assertRRsetInAnswer(res
, expected
)
93 def testUndelegatedForwardedInsecureZoneNXDOMAIN(self
):
95 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that does not exist
98 query
= dns
.message
.make_query('node2.undelegated.insecure.example.', 'A')
99 query
.flags |
= dns
.flags
.AD
101 # twice, so we hit the negative record cache
102 self
.sendUDPQuery(query
)
103 res
= self
.sendUDPQuery(query
)
105 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
106 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
108 def testBothSecureCNAMEAtApex(self
):
110 #4466: a CNAME at the apex of a secure domain to another secure domain made us use the wrong DNSKEY to validate
112 query
= dns
.message
.make_query('cname-secure.example.', 'A')
113 query
.flags |
= dns
.flags
.AD
115 res
= self
.sendUDPQuery(query
)
116 expectedCNAME
= dns
.rrset
.from_text('cname-secure.example.', 0, dns
.rdataclass
.IN
, 'CNAME', 'secure.example.')
117 expectedA
= dns
.rrset
.from_text('secure.example.', 0, dns
.rdataclass
.IN
, 'A', '192.0.2.17')
119 self
.assertRRsetInAnswer(res
, expectedA
)
120 self
.assertRRsetInAnswer(res
, expectedCNAME
)
121 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
122 self
.assertMessageHasFlags(res
, ['QR', 'RD', 'RA', 'AD'], [])
124 def testNonApexDNSKEY(self
):
126 a DNSKEY not at the apex of a zone should not be treated as a DNSKEY in validation
128 query
= dns
.message
.make_query('non-apex-dnskey.secure.example.', 'DNSKEY')
129 query
.flags |
= dns
.flags
.AD
131 res
= self
.sendUDPQuery(query
)
133 expectedDNSKEY
= dns
.rrset
.from_text('non-apex-dnskey.secure.example.', 0, dns
.rdataclass
.IN
, 'DNSKEY', '257 3 13 CT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
135 self
.assertRRsetInAnswer(res
, expectedDNSKEY
)
136 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
137 self
.assertMessageHasFlags(res
, ['QR', 'RD', 'RA', 'AD'], [])
141 def startResponders(cls
):
142 print("Launching responders..")
144 address
= cls
._PREFIX
+ '.2'
147 reactor
.listenUDP(port
, UDPResponder(), interface
=address
)
149 if not reactor
.running
:
150 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=reactor
.run
, args
=(False,))
151 cls
._UDPResponder
.setDaemon(True)
152 cls
._UDPResponder
.start()
154 class testInteropProcess(RecursorTest
):
155 _confdir
= 'InteropProcess'
157 _config_template
= """dnssec=process
158 packetcache-ttl=0 # explicitly disable packetcache
159 forward-zones=undelegated.secure.example=%s.12
160 forward-zones+=undelegated.insecure.example=%s.12
161 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
163 def testNonApexDNSKEY2(self
):
165 a DNSKEY not at the apex of a zone should not be treated as a DNSKEY in validation,
166 even when it was cached with Indeterminate validation state before
169 # send the query with +CD so the record ends up cached with Indeterminate validation state
170 query
= dns
.message
.make_query('non-apex-dnskey2.secure.example.', 'DNSKEY')
171 query
.flags |
= dns
.flags
.CD
173 res
= self
.sendUDPQuery(query
)
175 expectedDNSKEY
= dns
.rrset
.from_text('non-apex-dnskey2.secure.example.', 0, dns
.rdataclass
.IN
, 'DNSKEY', '256 3 13 CT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
177 self
.assertRRsetInAnswer(res
, expectedDNSKEY
)
178 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
179 self
.assertMessageHasFlags(res
, ['QR', 'RD', 'RA', 'CD'], [])
181 # now ask again with +AD so it has to be validated from cache
182 query
= dns
.message
.make_query('non-apex-dnskey2.secure.example.', 'DNSKEY')
183 query
.flags |
= dns
.flags
.AD
185 res
= self
.sendUDPQuery(query
)
187 expectedDNSKEY
= dns
.rrset
.from_text('non-apex-dnskey2.secure.example.', 0, dns
.rdataclass
.IN
, 'DNSKEY', '256 3 13 CT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
189 self
.assertRRsetInAnswer(res
, expectedDNSKEY
)
190 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
191 self
.assertMessageHasFlags(res
, ['QR', 'RD', 'RA', 'AD'], [])
193 def testNonApexDNSKEYANYQuery(self
):
195 a DNSKEY not at the apex of a zone should not be treated as a DNSKEY in validation,
196 even when it was cached with Indeterminate validation state before.
197 This code tests the ANY path which is separate from the qtype=DNSKEY path tested in testNonApexDNSKEY2
200 # send the query with +CD so the record ends up cached with Indeterminate validation state
201 query
= dns
.message
.make_query('non-apex-dnskey3.secure.example.', 'DNSKEY')
202 query
.flags |
= dns
.flags
.CD
204 res
= self
.sendUDPQuery(query
)
206 expectedDNSKEY
= dns
.rrset
.from_text('non-apex-dnskey3.secure.example.', 0, dns
.rdataclass
.IN
, 'DNSKEY', '256 3 13 DT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
208 self
.assertRRsetInAnswer(res
, expectedDNSKEY
)
209 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
210 self
.assertMessageHasFlags(res
, ['QR', 'RD', 'RA', 'CD'], [])
212 # now ask again with +AD so it has to be validated from cache
213 query
= dns
.message
.make_query('non-apex-dnskey3.secure.example.', 'ANY')
214 query
.flags |
= dns
.flags
.AD
216 res
= self
.sendUDPQuery(query
)
218 expectedDNSKEY
= dns
.rrset
.from_text('non-apex-dnskey3.secure.example.', 0, dns
.rdataclass
.IN
, 'DNSKEY', '256 3 13 DT6AJ4MEOtNDgj0+xLtTLGHf1WbLsKWZI8ONHOt/6q7hTjeWSnY/SGig1dIKZrHg+pJFUSPaxeShv48SYVRKEg==')
220 self
.assertRRsetInAnswer(res
, expectedDNSKEY
)
221 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
222 self
.assertMessageHasFlags(res
, ['QR', 'RD', 'RA', 'AD'], [])
224 class UDPResponder(DatagramProtocol
):
225 def datagramReceived(self
, datagram
, address
):
226 request
= dns
.message
.from_wire(datagram
)
228 response
= dns
.message
.make_response(request
)
229 response
.flags
= dns
.flags
.AA
+ dns
.flags
.QR
231 if request
.edns
!= -1:
232 response
.set_rcode(dns
.rcode
.FORMERR
)
233 response
.use_edns(edns
=-1)
235 response
.additional
= []
237 if request
.question
[0].name
== dns
.name
.from_text('host1.insecure-formerr.example.') and request
.question
[0].rdtype
== dns
.rdatatype
.A
:
238 answer
= dns
.rrset
.from_text('host1.insecure-formerr.example.', 15, dns
.rdataclass
.IN
, 'A', '127.0.0.1')
239 response
.answer
.append(answer
)
240 elif request
.question
[0].name
== dns
.name
.from_text('insecure-formerr.example.') and request
.question
[0].rdtype
== dns
.rdatatype
.NS
:
241 answer
= dns
.rrset
.from_text('insecure-formerr.example.', 15, dns
.rdataclass
.IN
, 'NS', 'ns1.insecure-formerr.example.')
242 response
.answer
.append(answer
)
243 additional
= dns
.rrset
.from_text('ns1.insecure-formerr.example.', 15, dns
.rdataclass
.IN
, 'A', '127.0.0.2')
244 response
.additional
.append(additional
)
246 self
.transport
.write(response
.to_wire(), address
)