]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
dnsdist: Support setting the value of AA, AD and RA when spoofing
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSSelfGenerated.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 from dnsdisttests import DNSDistTest
5 from datetime import datetime, timedelta
6
7 class TestEDNSSelfGenerated(DNSDistTest):
8 """
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
11 """
12
13 _config_template = """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
16
17 function luarule(dq)
18 return DNSAction.Nxdomain, ""
19 end
20
21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
22
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
24
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
26
27 newServer{address="127.0.0.1:%s"}
28 """
29
30 def testNoEDNS(self):
31 """
32 EDNS on Self-Generated: No existing EDNS
33 """
34 name = 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
36 expectedResponse = dns.message.make_response(query)
37 expectedResponse.set_rcode(dns.rcode.REFUSED)
38
39 for method in ("sendUDPQuery", "sendTCPQuery"):
40 sender = getattr(self, method)
41 (_, receivedResponse) = sender(query, response=None, useQueue=False)
42 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
43
44 name = 'no-edns.tc.edns-self.tests.powerdns.com.'
45 query = dns.message.make_query(name, 'A', 'IN')
46 # dnsdist sets RA = RD for TC responses
47 query.flags &= ~dns.flags.RD
48 expectedResponse = dns.message.make_response(query)
49 expectedResponse.flags |= dns.flags.TC
50
51 for method in ("sendUDPQuery", "sendTCPQuery"):
52 sender = getattr(self, method)
53 (_, receivedResponse) = sender(query, response=None, useQueue=False)
54 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
55
56 name = 'no-edns.lua.edns-self.tests.powerdns.com.'
57 query = dns.message.make_query(name, 'A', 'IN')
58 expectedResponse = dns.message.make_response(query)
59 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
60
61 for method in ("sendUDPQuery", "sendTCPQuery"):
62 sender = getattr(self, method)
63 (_, receivedResponse) = sender(query, response=None, useQueue=False)
64 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
65
66 name = 'no-edns.spoof.edns-self.tests.powerdns.com.'
67 query = dns.message.make_query(name, 'A', 'IN')
68 # dnsdist set RA = RD for spoofed responses
69 query.flags &= ~dns.flags.RD
70 expectedResponse = dns.message.make_response(query, our_payload=1042)
71 expectedResponse.answer.append(dns.rrset.from_text(name,
72 60,
73 dns.rdataclass.IN,
74 dns.rdatatype.A,
75 '192.0.2.1', '192.0.2.2'))
76
77 for method in ("sendUDPQuery", "sendTCPQuery"):
78 sender = getattr(self, method)
79 (_, receivedResponse) = sender(query, response=None, useQueue=False)
80 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
81
82 def testWithEDNSNoDO(self):
83 """
84 EDNS on Self-Generated: EDNS with DO=0
85 """
86 name = 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
87 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
88 expectedResponse = dns.message.make_response(query, our_payload=1042)
89 expectedResponse.set_rcode(dns.rcode.REFUSED)
90
91 for method in ("sendUDPQuery", "sendTCPQuery"):
92 sender = getattr(self, method)
93 (_, receivedResponse) = sender(query, response=None, useQueue=False)
94 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
95 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
96 self.assertEquals(receivedResponse.payload, 1042)
97
98 name = 'edns-no-do.tc.edns-self.tests.powerdns.com.'
99 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
100 # dnsdist sets RA = RD for TC responses
101 query.flags &= ~dns.flags.RD
102 expectedResponse = dns.message.make_response(query, our_payload=1042)
103 expectedResponse.flags |= dns.flags.TC
104
105 for method in ("sendUDPQuery", "sendTCPQuery"):
106 sender = getattr(self, method)
107 (_, receivedResponse) = sender(query, response=None, useQueue=False)
108 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
109 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
110 self.assertEquals(receivedResponse.payload, 1042)
111
112 name = 'edns-no-do.lua.edns-self.tests.powerdns.com.'
113 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
114 expectedResponse = dns.message.make_response(query, our_payload=1042)
115 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
116
117 for method in ("sendUDPQuery", "sendTCPQuery"):
118 sender = getattr(self, method)
119 (_, receivedResponse) = sender(query, response=None, useQueue=False)
120 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
121 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
122 self.assertEquals(receivedResponse.payload, 1042)
123
124 name = 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
125 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
126 # dnsdist set RA = RD for spoofed responses
127 query.flags &= ~dns.flags.RD
128 expectedResponse = dns.message.make_response(query, our_payload=1042)
129 expectedResponse.answer.append(dns.rrset.from_text(name,
130 60,
131 dns.rdataclass.IN,
132 dns.rdatatype.A,
133 '192.0.2.1', '192.0.2.2'))
134
135 for method in ("sendUDPQuery", "sendTCPQuery"):
136 sender = getattr(self, method)
137 (_, receivedResponse) = sender(query, response=None, useQueue=False)
138 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
139 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
140 self.assertEquals(receivedResponse.payload, 1042)
141
142 def testWithEDNSWithDO(self):
143 """
144 EDNS on Self-Generated: EDNS with DO=1
145 """
146 name = 'edns-do.rcode.edns-self.tests.powerdns.com.'
147 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
148 expectedResponse = dns.message.make_response(query, our_payload=1042)
149 expectedResponse.set_rcode(dns.rcode.REFUSED)
150
151 for method in ("sendUDPQuery", "sendTCPQuery"):
152 sender = getattr(self, method)
153 (_, receivedResponse) = sender(query, response=None, useQueue=False)
154 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
155 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
156 self.assertEquals(receivedResponse.payload, 1042)
157
158 name = 'edns-do.tc.edns-self.tests.powerdns.com.'
159 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
160 # dnsdist sets RA = RD for TC responses
161 query.flags &= ~dns.flags.RD
162 expectedResponse = dns.message.make_response(query, our_payload=1042)
163 expectedResponse.flags |= dns.flags.TC
164
165 for method in ("sendUDPQuery", "sendTCPQuery"):
166 sender = getattr(self, method)
167 (_, receivedResponse) = sender(query, response=None, useQueue=False)
168 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
169 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
170 self.assertEquals(receivedResponse.payload, 1042)
171
172 name = 'edns-do.lua.edns-self.tests.powerdns.com.'
173 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
174 expectedResponse = dns.message.make_response(query, our_payload=1042)
175 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
176
177 for method in ("sendUDPQuery", "sendTCPQuery"):
178 sender = getattr(self, method)
179 (_, receivedResponse) = sender(query, response=None, useQueue=False)
180 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
181 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
182 self.assertEquals(receivedResponse.payload, 1042)
183
184 name = 'edns-do.spoof.edns-self.tests.powerdns.com.'
185 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
186 # dnsdist set RA = RD for spoofed responses
187 query.flags &= ~dns.flags.RD
188 expectedResponse = dns.message.make_response(query, our_payload=1042)
189 expectedResponse.answer.append(dns.rrset.from_text(name,
190 60,
191 dns.rdataclass.IN,
192 dns.rdatatype.A,
193 '192.0.2.1', '192.0.2.2'))
194
195 for method in ("sendUDPQuery", "sendTCPQuery"):
196 sender = getattr(self, method)
197 (_, receivedResponse) = sender(query, response=None, useQueue=False)
198 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
199 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
200 self.assertEquals(receivedResponse.payload, 1042)
201
202 def testWithEDNSNoOptions(self):
203 """
204 EDNS on Self-Generated: EDNS with options in the query
205 """
206 name = 'edns-options.rcode.edns-self.tests.powerdns.com.'
207 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
208 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
209 expectedResponse = dns.message.make_response(query, our_payload=1042)
210 expectedResponse.set_rcode(dns.rcode.REFUSED)
211
212 for method in ("sendUDPQuery", "sendTCPQuery"):
213 sender = getattr(self, method)
214 (_, receivedResponse) = sender(query, response=None, useQueue=False)
215 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
216 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
217 self.assertEquals(receivedResponse.payload, 1042)
218
219 name = 'edns-options.tc.edns-self.tests.powerdns.com.'
220 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
221 # dnsdist sets RA = RD for TC responses
222 query.flags &= ~dns.flags.RD
223 expectedResponse = dns.message.make_response(query, our_payload=1042)
224 expectedResponse.flags |= dns.flags.TC
225
226 for method in ("sendUDPQuery", "sendTCPQuery"):
227 sender = getattr(self, method)
228 (_, receivedResponse) = sender(query, response=None, useQueue=False)
229 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
230 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
231 self.assertEquals(receivedResponse.payload, 1042)
232
233 name = 'edns-options.lua.edns-self.tests.powerdns.com.'
234 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
235 expectedResponse = dns.message.make_response(query, our_payload=1042)
236 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
237
238 for method in ("sendUDPQuery", "sendTCPQuery"):
239 sender = getattr(self, method)
240 (_, receivedResponse) = sender(query, response=None, useQueue=False)
241 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
242 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
243 self.assertEquals(receivedResponse.payload, 1042)
244
245 name = 'edns-options.spoof.edns-self.tests.powerdns.com.'
246 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
247 # dnsdist set RA = RD for spoofed responses
248 query.flags &= ~dns.flags.RD
249 expectedResponse = dns.message.make_response(query, our_payload=1042)
250 expectedResponse.answer.append(dns.rrset.from_text(name,
251 60,
252 dns.rdataclass.IN,
253 dns.rdatatype.A,
254 '192.0.2.1', '192.0.2.2'))
255
256 for method in ("sendUDPQuery", "sendTCPQuery"):
257 sender = getattr(self, method)
258 (_, receivedResponse) = sender(query, response=None, useQueue=False)
259 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
260 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
261 self.assertEquals(receivedResponse.payload, 1042)
262
263
264 class TestEDNSSelfGeneratedDisabled(DNSDistTest):
265 """
266 Check that dnsdist does not send EDNS data on
267 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
268 """
269
270 _config_template = """
271 setAddEDNSToSelfGeneratedResponses(false)
272
273 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
274 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
275
276 function luarule(dq)
277 return DNSAction.Nxdomain, ""
278 end
279
280 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
281
282 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
283
284 setPayloadSizeOnSelfGeneratedAnswers(1042)
285
286 newServer{address="127.0.0.1:%s"}
287 """
288
289 def testWithEDNSNoDO(self):
290 """
291 EDNS on Self-Generated (disabled): EDNS with DO=0
292 """
293 name = 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
294 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
295 expectedResponse = dns.message.make_response(query)
296 expectedResponse.set_rcode(dns.rcode.REFUSED)
297
298 for method in ("sendUDPQuery", "sendTCPQuery"):
299 sender = getattr(self, method)
300 (_, receivedResponse) = sender(query, response=None, useQueue=False)
301 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
302
303 name = 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
304 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
305 # dnsdist sets RA = RD for TC responses
306 query.flags &= ~dns.flags.RD
307 expectedResponse = dns.message.make_response(query)
308 expectedResponse.flags |= dns.flags.TC
309
310 for method in ("sendUDPQuery", "sendTCPQuery"):
311 sender = getattr(self, method)
312 (_, receivedResponse) = sender(query, response=None, useQueue=False)
313 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
314
315 name = 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
316 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
317 expectedResponse = dns.message.make_response(query)
318 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
319
320 for method in ("sendUDPQuery", "sendTCPQuery"):
321 sender = getattr(self, method)
322 (_, receivedResponse) = sender(query, response=None, useQueue=False)
323 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
324
325 name = 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
326 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
327 # dnsdist set RA = RD for spoofed responses
328 query.flags &= ~dns.flags.RD
329 expectedResponse = dns.message.make_response(query)
330 expectedResponse.answer.append(dns.rrset.from_text(name,
331 60,
332 dns.rdataclass.IN,
333 dns.rdatatype.A,
334 '192.0.2.1', '192.0.2.2'))
335
336 for method in ("sendUDPQuery", "sendTCPQuery"):
337 sender = getattr(self, method)
338 (_, receivedResponse) = sender(query, response=None, useQueue=False)
339 self.checkMessageNoEDNS(expectedResponse, receivedResponse)