]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
Merge pull request #13861 from omoerbeek/rec-rpzloader-tidy
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSSelfGenerated.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 from dnsdisttests import DNSDistTest
5 from datetime import datetime, timedelta
6
7 class TestEDNSSelfGenerated(DNSDistTest):
8 """
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
11 """
12
13 _config_template = """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
16
17 function luarule(dq)
18 return DNSAction.Nxdomain, ""
19 end
20
21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
22
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
24
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
26
27 newServer{address="127.0.0.1:%s"}
28 """
29
30 def testNoEDNS(self):
31 """
32 EDNS on Self-Generated: No existing EDNS
33 """
34 name = 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
36 query.flags &= ~dns.flags.RD
37 expectedResponse = dns.message.make_response(query)
38 expectedResponse.set_rcode(dns.rcode.REFUSED)
39
40 for method in ("sendUDPQuery", "sendTCPQuery"):
41 sender = getattr(self, method)
42 (_, receivedResponse) = sender(query, response=None, useQueue=False)
43 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
44
45 name = 'no-edns.tc.edns-self.tests.powerdns.com.'
46 query = dns.message.make_query(name, 'A', 'IN')
47 # dnsdist sets RA = RD for TC responses
48 query.flags &= ~dns.flags.RD
49 expectedResponse = dns.message.make_response(query)
50 expectedResponse.flags |= dns.flags.TC
51
52 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
53 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
54
55 name = 'no-edns.lua.edns-self.tests.powerdns.com.'
56 query = dns.message.make_query(name, 'A', 'IN')
57 expectedResponse = dns.message.make_response(query)
58 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
59
60 for method in ("sendUDPQuery", "sendTCPQuery"):
61 sender = getattr(self, method)
62 (_, receivedResponse) = sender(query, response=None, useQueue=False)
63 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
64
65 name = 'no-edns.spoof.edns-self.tests.powerdns.com.'
66 query = dns.message.make_query(name, 'A', 'IN')
67 # dnsdist set RA = RD for spoofed responses
68 query.flags &= ~dns.flags.RD
69 expectedResponse = dns.message.make_response(query, our_payload=1042)
70 expectedResponse.answer.append(dns.rrset.from_text(name,
71 60,
72 dns.rdataclass.IN,
73 dns.rdatatype.A,
74 '192.0.2.1', '192.0.2.2'))
75
76 for method in ("sendUDPQuery", "sendTCPQuery"):
77 sender = getattr(self, method)
78 (_, receivedResponse) = sender(query, response=None, useQueue=False)
79 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
80
81 def testWithEDNSNoDO(self):
82 """
83 EDNS on Self-Generated: EDNS with DO=0
84 """
85 name = 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
86 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
87 query.flags &= ~dns.flags.RD
88 expectedResponse = dns.message.make_response(query, our_payload=1042)
89 expectedResponse.set_rcode(dns.rcode.REFUSED)
90
91 for method in ("sendUDPQuery", "sendTCPQuery"):
92 sender = getattr(self, method)
93 (_, receivedResponse) = sender(query, response=None, useQueue=False)
94 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
95 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
96 self.assertEqual(receivedResponse.payload, 1042)
97
98 name = 'edns-no-do.tc.edns-self.tests.powerdns.com.'
99 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
100 # dnsdist sets RA = RD for TC responses
101 query.flags &= ~dns.flags.RD
102 expectedResponse = dns.message.make_response(query, our_payload=1042)
103 expectedResponse.flags |= dns.flags.TC
104
105 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
106 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
107 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
108 self.assertEqual(receivedResponse.payload, 1042)
109
110 name = 'edns-no-do.lua.edns-self.tests.powerdns.com.'
111 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
112 expectedResponse = dns.message.make_response(query, our_payload=1042)
113 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
114
115 for method in ("sendUDPQuery", "sendTCPQuery"):
116 sender = getattr(self, method)
117 (_, receivedResponse) = sender(query, response=None, useQueue=False)
118 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
119 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
120 self.assertEqual(receivedResponse.payload, 1042)
121
122 name = 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
123 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
124 # dnsdist set RA = RD for spoofed responses
125 query.flags &= ~dns.flags.RD
126 expectedResponse = dns.message.make_response(query, our_payload=1042)
127 expectedResponse.answer.append(dns.rrset.from_text(name,
128 60,
129 dns.rdataclass.IN,
130 dns.rdatatype.A,
131 '192.0.2.1', '192.0.2.2'))
132
133 for method in ("sendUDPQuery", "sendTCPQuery"):
134 sender = getattr(self, method)
135 (_, receivedResponse) = sender(query, response=None, useQueue=False)
136 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
137 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
138 self.assertEqual(receivedResponse.payload, 1042)
139
140 def testWithEDNSWithDO(self):
141 """
142 EDNS on Self-Generated: EDNS with DO=1
143 """
144 name = 'edns-do.rcode.edns-self.tests.powerdns.com.'
145 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
146 query.flags &= ~dns.flags.RD
147 expectedResponse = dns.message.make_response(query, our_payload=1042)
148 expectedResponse.set_rcode(dns.rcode.REFUSED)
149
150 for method in ("sendUDPQuery", "sendTCPQuery"):
151 sender = getattr(self, method)
152 (_, receivedResponse) = sender(query, response=None, useQueue=False)
153 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
154 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
155 self.assertEqual(receivedResponse.payload, 1042)
156
157 name = 'edns-do.tc.edns-self.tests.powerdns.com.'
158 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
159 # dnsdist sets RA = RD for TC responses
160 query.flags &= ~dns.flags.RD
161 expectedResponse = dns.message.make_response(query, our_payload=1042)
162 expectedResponse.flags |= dns.flags.TC
163
164 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
165 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
166 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
167 self.assertEqual(receivedResponse.payload, 1042)
168
169 name = 'edns-do.lua.edns-self.tests.powerdns.com.'
170 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
171 expectedResponse = dns.message.make_response(query, our_payload=1042)
172 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
173
174 for method in ("sendUDPQuery", "sendTCPQuery"):
175 sender = getattr(self, method)
176 (_, receivedResponse) = sender(query, response=None, useQueue=False)
177 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
178 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
179 self.assertEqual(receivedResponse.payload, 1042)
180
181 name = 'edns-do.spoof.edns-self.tests.powerdns.com.'
182 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
183 # dnsdist set RA = RD for spoofed responses
184 query.flags &= ~dns.flags.RD
185 expectedResponse = dns.message.make_response(query, our_payload=1042)
186 expectedResponse.answer.append(dns.rrset.from_text(name,
187 60,
188 dns.rdataclass.IN,
189 dns.rdatatype.A,
190 '192.0.2.1', '192.0.2.2'))
191
192 for method in ("sendUDPQuery", "sendTCPQuery"):
193 sender = getattr(self, method)
194 (_, receivedResponse) = sender(query, response=None, useQueue=False)
195 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
196 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
197 self.assertEqual(receivedResponse.payload, 1042)
198
199 def testWithEDNSNoOptions(self):
200 """
201 EDNS on Self-Generated: EDNS with options in the query
202 """
203 name = 'edns-options.rcode.edns-self.tests.powerdns.com.'
204 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
205 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
206 query.flags &= ~dns.flags.RD
207 expectedResponse = dns.message.make_response(query, our_payload=1042)
208 expectedResponse.set_rcode(dns.rcode.REFUSED)
209
210 for method in ("sendUDPQuery", "sendTCPQuery"):
211 sender = getattr(self, method)
212 (_, receivedResponse) = sender(query, response=None, useQueue=False)
213 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
214 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
215 self.assertEqual(receivedResponse.payload, 1042)
216
217 name = 'edns-options.tc.edns-self.tests.powerdns.com.'
218 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
219 # dnsdist sets RA = RD for TC responses
220 query.flags &= ~dns.flags.RD
221 expectedResponse = dns.message.make_response(query, our_payload=1042)
222 expectedResponse.flags |= dns.flags.TC
223
224 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
225 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
226 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
227 self.assertEqual(receivedResponse.payload, 1042)
228
229 name = 'edns-options.lua.edns-self.tests.powerdns.com.'
230 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
231 expectedResponse = dns.message.make_response(query, our_payload=1042)
232 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
233
234 for method in ("sendUDPQuery", "sendTCPQuery"):
235 sender = getattr(self, method)
236 (_, receivedResponse) = sender(query, response=None, useQueue=False)
237 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
238 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
239 self.assertEqual(receivedResponse.payload, 1042)
240
241 name = 'edns-options.spoof.edns-self.tests.powerdns.com.'
242 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
243 # dnsdist set RA = RD for spoofed responses
244 query.flags &= ~dns.flags.RD
245 expectedResponse = dns.message.make_response(query, our_payload=1042)
246 expectedResponse.answer.append(dns.rrset.from_text(name,
247 60,
248 dns.rdataclass.IN,
249 dns.rdatatype.A,
250 '192.0.2.1', '192.0.2.2'))
251
252 for method in ("sendUDPQuery", "sendTCPQuery"):
253 sender = getattr(self, method)
254 (_, receivedResponse) = sender(query, response=None, useQueue=False)
255 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
256 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
257 self.assertEqual(receivedResponse.payload, 1042)
258
259
260 class TestEDNSSelfGeneratedDisabled(DNSDistTest):
261 """
262 Check that dnsdist does not send EDNS data on
263 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
264 """
265
266 _config_template = """
267 setAddEDNSToSelfGeneratedResponses(false)
268
269 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
270 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
271
272 function luarule(dq)
273 return DNSAction.Nxdomain, ""
274 end
275
276 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
277
278 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
279
280 setPayloadSizeOnSelfGeneratedAnswers(1042)
281
282 newServer{address="127.0.0.1:%s"}
283 """
284
285 def testWithEDNSNoDO(self):
286 """
287 EDNS on Self-Generated (disabled): EDNS with DO=0
288 """
289 name = 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
290 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
291 query.flags &= ~dns.flags.RD
292 expectedResponse = dns.message.make_response(query)
293 expectedResponse.set_rcode(dns.rcode.REFUSED)
294
295 for method in ("sendUDPQuery", "sendTCPQuery"):
296 sender = getattr(self, method)
297 (_, receivedResponse) = sender(query, response=None, useQueue=False)
298 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
299
300 name = 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
301 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
302 # dnsdist sets RA = RD for TC responses
303 query.flags &= ~dns.flags.RD
304 expectedResponse = dns.message.make_response(query)
305 expectedResponse.flags |= dns.flags.TC
306
307 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
308 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
309
310 name = 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
311 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
312 expectedResponse = dns.message.make_response(query)
313 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
314
315 for method in ("sendUDPQuery", "sendTCPQuery"):
316 sender = getattr(self, method)
317 (_, receivedResponse) = sender(query, response=None, useQueue=False)
318 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
319
320 name = 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
321 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
322 # dnsdist set RA = RD for spoofed responses
323 query.flags &= ~dns.flags.RD
324 expectedResponse = dns.message.make_response(query)
325 expectedResponse.answer.append(dns.rrset.from_text(name,
326 60,
327 dns.rdataclass.IN,
328 dns.rdatatype.A,
329 '192.0.2.1', '192.0.2.2'))
330
331 for method in ("sendUDPQuery", "sendTCPQuery"):
332 sender = getattr(self, method)
333 (_, receivedResponse) = sender(query, response=None, useQueue=False)
334 self.checkMessageNoEDNS(expectedResponse, receivedResponse)