]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_Interop.py
Make sure we can install unsigned packages.
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_Interop.py
1 import dns
2 import socket
3 import copy
4 import os
5 from recursortests import RecursorTest
6 from twisted.internet.protocol import DatagramProtocol
7 from twisted.internet import reactor
8 import threading
9
10 class testInterop(RecursorTest):
11 _confdir = 'Interop'
12
13 _config_template = """dnssec=validate
14 packetcache-ttl=0 # explicitly disable packetcache
15 forward-zones=undelegated.secure.example=%s.12
16 forward-zones+=undelegated.insecure.example=%s.12
17 """ % (os.environ['PREFIX'], os.environ['PREFIX'])
18
19 def testFORMERR(self):
20 """
21 #3841, when we encounter a server that does not understands OPT records
22 (or something else), we don't retry without EDNS in dnssec=validate mode
23 """
24 expected = dns.rrset.from_text('host1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.1')
25
26 query = dns.message.make_query('cname-to-formerr.secure.example.', 'A')
27 res = self.sendUDPQuery(query)
28
29 self.assertRcodeEqual(res, dns.rcode.NOERROR)
30 self.assertRRsetInAnswer(res, expected)
31
32 def testCNAMEWithLowerEntries(self):
33 """
34 #4158, When chasing down for DS/DNSKEY and we find a CNAME, skip a level
35 """
36 expected = dns.rrset.from_text('node1.insecure.sub2.secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.18')
37
38 query = dns.message.make_query('node1.insecure.sub2.secure.example.', 'A')
39 query.flags |= dns.flags.AD
40 res = self.sendUDPQuery(query)
41
42 self.assertRcodeEqual(res, dns.rcode.NOERROR)
43 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
44 self.assertRRsetInAnswer(res, expected)
45
46 def testUndelegatedForwardedZoneExisting(self):
47 """
48 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that exists
49 """
50
51 query = dns.message.make_query('node1.undelegated.secure.example.', 'A')
52 query.flags |= dns.flags.AD
53
54 # twice, so we hit the record cache
55 self.sendUDPQuery(query)
56 res = self.sendUDPQuery(query)
57
58 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
59 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
60
61 def testUndelegatedForwardedZoneNXDOMAIN(self):
62 """
63 #4369. Ensure we SERVFAIL when forwarding to undelegated zones for a name that does not exist
64 """
65
66 query = dns.message.make_query('node2.undelegated.secure.example.', 'A')
67 query.flags |= dns.flags.AD
68
69 # twice, so we hit the negative record cache
70 self.sendUDPQuery(query)
71 res = self.sendUDPQuery(query)
72
73 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
74 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
75
76 def testUndelegatedForwardedInsecureZoneExisting(self):
77 """
78 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that exists
79 """
80
81 expected = dns.rrset.from_text('node1.undelegated.insecure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.22')
82 query = dns.message.make_query('node1.undelegated.insecure.example.', 'A')
83 query.flags |= dns.flags.AD
84
85 # twice, so we hit the record cache
86 self.sendUDPQuery(query)
87 res = self.sendUDPQuery(query)
88
89 self.assertRcodeEqual(res, dns.rcode.NOERROR)
90 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
91 self.assertRRsetInAnswer(res, expected)
92
93 def testUndelegatedForwardedInsecureZoneNXDOMAIN(self):
94 """
95 #4369. Ensure we answer when forwarding to an undelegated zone in an insecure zone for a name that does not exist
96 """
97
98 query = dns.message.make_query('node2.undelegated.insecure.example.', 'A')
99 query.flags |= dns.flags.AD
100
101 # twice, so we hit the negative record cache
102 self.sendUDPQuery(query)
103 res = self.sendUDPQuery(query)
104
105 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
106 self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], [])
107
108 def testBothSecureCNAMEAtApex(self):
109 """
110 #4466: a CNAME at the apex of a secure domain to another secure domain made us use the wrong DNSKEY to validate
111 """
112 query = dns.message.make_query('cname-secure.example.', 'A')
113 query.flags |= dns.flags.AD
114
115 res = self.sendUDPQuery(query)
116 expectedCNAME = dns.rrset.from_text('cname-secure.example.', 0, dns.rdataclass.IN, 'CNAME', 'secure.example.')
117 expectedA = dns.rrset.from_text('secure.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.17')
118
119 self.assertRRsetInAnswer(res, expectedA)
120 self.assertRRsetInAnswer(res, expectedCNAME)
121 self.assertRcodeEqual(res, dns.rcode.NOERROR)
122 self.assertMessageHasFlags(res, ['QR', 'RD', 'RA', 'AD'], [])
123
124 @classmethod
125 def startResponders(cls):
126 print("Launching responders..")
127
128 address = cls._PREFIX + '.2'
129 port = 53
130
131 reactor.listenUDP(port, UDPResponder(), interface=address)
132
133 if not reactor.running:
134 cls._UDPResponder = threading.Thread(name='UDP Responder', target=reactor.run, args=(False,))
135 cls._UDPResponder.setDaemon(True)
136 cls._UDPResponder.start()
137
138 class UDPResponder(DatagramProtocol):
139 def datagramReceived(self, datagram, address):
140 request = dns.message.from_wire(datagram)
141
142 response = dns.message.make_response(request)
143 response.flags = dns.flags.AA + dns.flags.QR
144
145 if request.edns != -1:
146 response.set_rcode(dns.rcode.FORMERR)
147 response.edns = -1
148 response.additional = []
149 else:
150 if request.question[0].name == dns.name.from_text('host1.insecure-formerr.example.') and request.question[0].rdtype == dns.rdatatype.A:
151 answer = dns.rrset.from_text('host1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.1')
152 response.answer.append(answer)
153 elif request.question[0].name == dns.name.from_text('insecure-formerr.example.') and request.question[0].rdtype == dns.rdatatype.NS:
154 answer = dns.rrset.from_text('insecure-formerr.example.', 15, dns.rdataclass.IN, 'NS', 'ns1.insecure-formerr.example.')
155 response.answer.append(answer)
156 additional = dns.rrset.from_text('ns1.insecure-formerr.example.', 15, dns.rdataclass.IN, 'A', '127.0.0.2')
157 response.additional.append(additional)
158
159 self.transport.write(response.to_wire(), address)