]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_Interop.py
5 from recursortests
import RecursorTest
6 from twisted
.internet
.protocol
import DatagramProtocol
7 from twisted
.internet
import reactor
10 class testInterop(RecursorTest
):
13 _config_template
= """dnssec=validate
14 packetcache-ttl=0 # explicitly disable packetcache
15 forward-zones=undelegated.secure.example=%s.12
16 forward-zones+=undelegated.insecure.example=%s.12
17 """ % (os
.environ
['PREFIX'], os
.environ
['PREFIX'])
19 def testFORMERR(self
):
21 #3841, when we encounter a server that does not understands OPT records
22 (or something else), we don't retry without EDNS in dnssec=validate mode
24 expected
= dns
.rrset
.from_text('host1.insecure-formerr.example.', 15, dns
.rdataclass
.IN
, 'A', '127.0.0.1')
26 query
= dns
.message
.make_query('cname-to-formerr.secure.example.', 'A')
27 res
= self
.sendUDPQuery(query
)
29 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
30 self
.assertRRsetInAnswer(res
, expected
)
32 def testCNAMEWithLowerEntries(self
):
34 #4158, When chasing down for DS/DNSKEY and we find a CNAME, skip a level
36 expected
= dns
.rrset
.from_text('node1.insecure.sub2.secure.example.', 0, dns
.rdataclass
.IN
, 'A', '192.0.2.18')
38 query
= dns
.message
.make_query('node1.insecure.sub2.secure.example.', 'A')
39 query
.flags |
= dns
.flags
.AD
40 res
= self
.sendUDPQuery(query
)
42 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
43 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
44 self
.assertRRsetInAnswer(res
, expected
)
46 def testUndelegatedForwardedZoneExisting(self
):
48 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that exists
51 query
= dns
.message
.make_query('node1.undelegated.secure.example.', 'A')
52 query
.flags |
= dns
.flags
.AD
54 # twice, so we hit the record cache
55 self
.sendUDPQuery(query
)
56 res
= self
.sendUDPQuery(query
)
58 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
59 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
61 def testUndelegatedForwardedZoneNXDOMAIN(self
):
63 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that does not exist
66 query
= dns
.message
.make_query('node2.undelegated.secure.example.', 'A')
67 query
.flags |
= dns
.flags
.AD
69 # twice, so we hit the negative record cache
70 self
.sendUDPQuery(query
)
71 res
= self
.sendUDPQuery(query
)
73 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
74 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
76 def testUndelegatedForwardedInsecureZoneExisting(self
):
78 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that exists
81 expected
= dns
.rrset
.from_text('node1.undelegated.insecure.example.', 0, dns
.rdataclass
.IN
, 'A', '192.0.2.22')
82 query
= dns
.message
.make_query('node1.undelegated.insecure.example.', 'A')
83 query
.flags |
= dns
.flags
.AD
85 # twice, so we hit the record cache
86 self
.sendUDPQuery(query
)
87 res
= self
.sendUDPQuery(query
)
89 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
90 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
91 self
.assertRRsetInAnswer(res
, expected
)
93 def testUndelegatedForwardedInsecureZoneNXDOMAIN(self
):
95 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that does not exist
98 query
= dns
.message
.make_query('node2.undelegated.insecure.example.', 'A')
99 query
.flags |
= dns
.flags
.AD
101 # twice, so we hit the negative record cache
102 self
.sendUDPQuery(query
)
103 res
= self
.sendUDPQuery(query
)
105 self
.assertRcodeEqual(res
, dns
.rcode
.NXDOMAIN
)
106 self
.assertMessageHasFlags(res
, ['QR', 'RA', 'RD'], [])
108 def testBothSecureCNAMEAtApex(self
):
110 #4466: a CNAME at the apex of a secure domain to another secure domain made us use the wrong DNSKEY to validate
112 query
= dns
.message
.make_query('cname-secure.example.', 'A')
113 query
.flags |
= dns
.flags
.AD
115 res
= self
.sendUDPQuery(query
)
116 expectedCNAME
= dns
.rrset
.from_text('cname-secure.example.', 0, dns
.rdataclass
.IN
, 'CNAME', 'secure.example.')
117 expectedA
= dns
.rrset
.from_text('secure.example.', 0, dns
.rdataclass
.IN
, 'A', '192.0.2.17')
119 self
.assertRRsetInAnswer(res
, expectedA
)
120 self
.assertRRsetInAnswer(res
, expectedCNAME
)
121 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
122 self
.assertMessageHasFlags(res
, ['QR', 'RD', 'RA', 'AD'], [])
125 def startResponders(cls
):
126 print("Launching responders..")
128 address
= cls
._PREFIX
+ '.2'
131 reactor
.listenUDP(port
, UDPResponder(), interface
=address
)
133 if not reactor
.running
:
134 cls
._UDPResponder
= threading
.Thread(name
='UDP Responder', target
=reactor
.run
, args
=(False,))
135 cls
._UDPResponder
.setDaemon(True)
136 cls
._UDPResponder
.start()
138 class UDPResponder(DatagramProtocol
):
139 def datagramReceived(self
, datagram
, address
):
140 request
= dns
.message
.from_wire(datagram
)
142 response
= dns
.message
.make_response(request
)
143 response
.flags
= dns
.flags
.AA
+ dns
.flags
.QR
145 if request
.edns
!= -1:
146 response
.set_rcode(dns
.rcode
.FORMERR
)
148 response
.additional
= []
150 if request
.question
[0].name
== dns
.name
.from_text('host1.insecure-formerr.example.') and request
.question
[0].rdtype
== dns
.rdatatype
.A
:
151 answer
= dns
.rrset
.from_text('host1.insecure-formerr.example.', 15, dns
.rdataclass
.IN
, 'A', '127.0.0.1')
152 response
.answer
.append(answer
)
153 elif request
.question
[0].name
== dns
.name
.from_text('insecure-formerr.example.') and request
.question
[0].rdtype
== dns
.rdatatype
.NS
:
154 answer
= dns
.rrset
.from_text('insecure-formerr.example.', 15, dns
.rdataclass
.IN
, 'NS', 'ns1.insecure-formerr.example.')
155 response
.answer
.append(answer
)
156 additional
= dns
.rrset
.from_text('ns1.insecure-formerr.example.', 15, dns
.rdataclass
.IN
, 'A', '127.0.0.2')
157 response
.additional
.append(additional
)
159 self
.transport
.write(response
.to_wire(), address
)