]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_EDNSSelfGenerated.py
Merge pull request #8795 from omoerbeek/rec-lua-docs-policytag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSSelfGenerated.py
CommitLineData
e7c732b8
RG
1#!/usr/bin/env python
2import dns
3import clientsubnetoption
4from dnsdisttests import DNSDistTest
5from datetime import datetime, timedelta
6
7class TestEDNSSelfGenerated(DNSDistTest):
8 """
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
11 """
12
13 _config_template = """
d3ec24f9 14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
e7c732b8
RG
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
16
17 function luarule(dq)
18 return DNSAction.Nxdomain, ""
19 end
20
955de53b 21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
e7c732b8
RG
22
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
24
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
26
27 newServer{address="127.0.0.1:%s"}
28 """
29
30 def testNoEDNS(self):
31 """
32 EDNS on Self-Generated: No existing EDNS
33 """
34 name = 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
7af22479 36 query.flags &= ~dns.flags.RD
e7c732b8
RG
37 expectedResponse = dns.message.make_response(query)
38 expectedResponse.set_rcode(dns.rcode.REFUSED)
39
6ca2e796
RG
40 for method in ("sendUDPQuery", "sendTCPQuery"):
41 sender = getattr(self, method)
42 (_, receivedResponse) = sender(query, response=None, useQueue=False)
43 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
44
45 name = 'no-edns.tc.edns-self.tests.powerdns.com.'
46 query = dns.message.make_query(name, 'A', 'IN')
955b9377
RG
47 # dnsdist sets RA = RD for TC responses
48 query.flags &= ~dns.flags.RD
e7c732b8
RG
49 expectedResponse = dns.message.make_response(query)
50 expectedResponse.flags |= dns.flags.TC
51
6ca2e796
RG
52 for method in ("sendUDPQuery", "sendTCPQuery"):
53 sender = getattr(self, method)
54 (_, receivedResponse) = sender(query, response=None, useQueue=False)
55 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
56
57 name = 'no-edns.lua.edns-self.tests.powerdns.com.'
58 query = dns.message.make_query(name, 'A', 'IN')
59 expectedResponse = dns.message.make_response(query)
60 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
61
6ca2e796
RG
62 for method in ("sendUDPQuery", "sendTCPQuery"):
63 sender = getattr(self, method)
64 (_, receivedResponse) = sender(query, response=None, useQueue=False)
65 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
66
67 name = 'no-edns.spoof.edns-self.tests.powerdns.com.'
68 query = dns.message.make_query(name, 'A', 'IN')
69 # dnsdist set RA = RD for spoofed responses
70 query.flags &= ~dns.flags.RD
d70d5ad3 71 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
72 expectedResponse.answer.append(dns.rrset.from_text(name,
73 60,
74 dns.rdataclass.IN,
75 dns.rdatatype.A,
76 '192.0.2.1', '192.0.2.2'))
77
6ca2e796
RG
78 for method in ("sendUDPQuery", "sendTCPQuery"):
79 sender = getattr(self, method)
80 (_, receivedResponse) = sender(query, response=None, useQueue=False)
81 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
82
83 def testWithEDNSNoDO(self):
84 """
85 EDNS on Self-Generated: EDNS with DO=0
86 """
87 name = 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
88 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
7af22479 89 query.flags &= ~dns.flags.RD
d70d5ad3 90 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
91 expectedResponse.set_rcode(dns.rcode.REFUSED)
92
6ca2e796
RG
93 for method in ("sendUDPQuery", "sendTCPQuery"):
94 sender = getattr(self, method)
95 (_, receivedResponse) = sender(query, response=None, useQueue=False)
96 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
97 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
98 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
99
100 name = 'edns-no-do.tc.edns-self.tests.powerdns.com.'
101 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
955b9377
RG
102 # dnsdist sets RA = RD for TC responses
103 query.flags &= ~dns.flags.RD
d70d5ad3 104 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
105 expectedResponse.flags |= dns.flags.TC
106
6ca2e796
RG
107 for method in ("sendUDPQuery", "sendTCPQuery"):
108 sender = getattr(self, method)
109 (_, receivedResponse) = sender(query, response=None, useQueue=False)
110 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
111 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
112 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
113
114 name = 'edns-no-do.lua.edns-self.tests.powerdns.com.'
115 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
d70d5ad3 116 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
117 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
118
6ca2e796
RG
119 for method in ("sendUDPQuery", "sendTCPQuery"):
120 sender = getattr(self, method)
121 (_, receivedResponse) = sender(query, response=None, useQueue=False)
122 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
123 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
124 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
125
126 name = 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
127 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
128 # dnsdist set RA = RD for spoofed responses
129 query.flags &= ~dns.flags.RD
d70d5ad3 130 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
131 expectedResponse.answer.append(dns.rrset.from_text(name,
132 60,
133 dns.rdataclass.IN,
134 dns.rdatatype.A,
135 '192.0.2.1', '192.0.2.2'))
136
6ca2e796
RG
137 for method in ("sendUDPQuery", "sendTCPQuery"):
138 sender = getattr(self, method)
139 (_, receivedResponse) = sender(query, response=None, useQueue=False)
140 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
141 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
142 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
143
144 def testWithEDNSWithDO(self):
145 """
146 EDNS on Self-Generated: EDNS with DO=1
147 """
148 name = 'edns-do.rcode.edns-self.tests.powerdns.com.'
149 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
7af22479 150 query.flags &= ~dns.flags.RD
d70d5ad3 151 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
152 expectedResponse.set_rcode(dns.rcode.REFUSED)
153
6ca2e796
RG
154 for method in ("sendUDPQuery", "sendTCPQuery"):
155 sender = getattr(self, method)
156 (_, receivedResponse) = sender(query, response=None, useQueue=False)
157 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
158 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
159 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
160
161 name = 'edns-do.tc.edns-self.tests.powerdns.com.'
162 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
955b9377
RG
163 # dnsdist sets RA = RD for TC responses
164 query.flags &= ~dns.flags.RD
d70d5ad3 165 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
166 expectedResponse.flags |= dns.flags.TC
167
6ca2e796
RG
168 for method in ("sendUDPQuery", "sendTCPQuery"):
169 sender = getattr(self, method)
170 (_, receivedResponse) = sender(query, response=None, useQueue=False)
171 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
172 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
173 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
174
175 name = 'edns-do.lua.edns-self.tests.powerdns.com.'
176 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
d70d5ad3 177 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
178 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
179
6ca2e796
RG
180 for method in ("sendUDPQuery", "sendTCPQuery"):
181 sender = getattr(self, method)
182 (_, receivedResponse) = sender(query, response=None, useQueue=False)
183 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
184 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
185 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
186
187 name = 'edns-do.spoof.edns-self.tests.powerdns.com.'
188 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
189 # dnsdist set RA = RD for spoofed responses
190 query.flags &= ~dns.flags.RD
d70d5ad3 191 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
192 expectedResponse.answer.append(dns.rrset.from_text(name,
193 60,
194 dns.rdataclass.IN,
195 dns.rdatatype.A,
196 '192.0.2.1', '192.0.2.2'))
197
6ca2e796
RG
198 for method in ("sendUDPQuery", "sendTCPQuery"):
199 sender = getattr(self, method)
200 (_, receivedResponse) = sender(query, response=None, useQueue=False)
201 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
202 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
203 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
204
205 def testWithEDNSNoOptions(self):
206 """
207 EDNS on Self-Generated: EDNS with options in the query
208 """
209 name = 'edns-options.rcode.edns-self.tests.powerdns.com.'
210 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
211 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
7af22479 212 query.flags &= ~dns.flags.RD
d70d5ad3 213 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
214 expectedResponse.set_rcode(dns.rcode.REFUSED)
215
6ca2e796
RG
216 for method in ("sendUDPQuery", "sendTCPQuery"):
217 sender = getattr(self, method)
218 (_, receivedResponse) = sender(query, response=None, useQueue=False)
219 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
220 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
221 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
222
223 name = 'edns-options.tc.edns-self.tests.powerdns.com.'
224 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
955b9377
RG
225 # dnsdist sets RA = RD for TC responses
226 query.flags &= ~dns.flags.RD
d70d5ad3 227 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
228 expectedResponse.flags |= dns.flags.TC
229
6ca2e796
RG
230 for method in ("sendUDPQuery", "sendTCPQuery"):
231 sender = getattr(self, method)
232 (_, receivedResponse) = sender(query, response=None, useQueue=False)
233 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
234 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
235 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
236
237 name = 'edns-options.lua.edns-self.tests.powerdns.com.'
238 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
d70d5ad3 239 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
240 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
241
6ca2e796
RG
242 for method in ("sendUDPQuery", "sendTCPQuery"):
243 sender = getattr(self, method)
244 (_, receivedResponse) = sender(query, response=None, useQueue=False)
245 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
246 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
247 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
248
249 name = 'edns-options.spoof.edns-self.tests.powerdns.com.'
250 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
251 # dnsdist set RA = RD for spoofed responses
252 query.flags &= ~dns.flags.RD
d70d5ad3 253 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
254 expectedResponse.answer.append(dns.rrset.from_text(name,
255 60,
256 dns.rdataclass.IN,
257 dns.rdatatype.A,
258 '192.0.2.1', '192.0.2.2'))
259
6ca2e796
RG
260 for method in ("sendUDPQuery", "sendTCPQuery"):
261 sender = getattr(self, method)
262 (_, receivedResponse) = sender(query, response=None, useQueue=False)
263 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
264 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
265 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
266
267
268class TestEDNSSelfGeneratedDisabled(DNSDistTest):
269 """
270 Check that dnsdist does not send EDNS data on
271 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
272 """
273
274 _config_template = """
275 setAddEDNSToSelfGeneratedResponses(false)
276
d3ec24f9 277 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
e7c732b8
RG
278 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
279
280 function luarule(dq)
281 return DNSAction.Nxdomain, ""
282 end
283
955de53b 284 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
e7c732b8
RG
285
286 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
287
288 setPayloadSizeOnSelfGeneratedAnswers(1042)
289
290 newServer{address="127.0.0.1:%s"}
291 """
292
293 def testWithEDNSNoDO(self):
294 """
295 EDNS on Self-Generated (disabled): EDNS with DO=0
296 """
297 name = 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
298 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
7af22479 299 query.flags &= ~dns.flags.RD
e7c732b8
RG
300 expectedResponse = dns.message.make_response(query)
301 expectedResponse.set_rcode(dns.rcode.REFUSED)
302
6ca2e796
RG
303 for method in ("sendUDPQuery", "sendTCPQuery"):
304 sender = getattr(self, method)
305 (_, receivedResponse) = sender(query, response=None, useQueue=False)
306 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
307
308 name = 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
309 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
955b9377
RG
310 # dnsdist sets RA = RD for TC responses
311 query.flags &= ~dns.flags.RD
e7c732b8
RG
312 expectedResponse = dns.message.make_response(query)
313 expectedResponse.flags |= dns.flags.TC
314
6ca2e796
RG
315 for method in ("sendUDPQuery", "sendTCPQuery"):
316 sender = getattr(self, method)
317 (_, receivedResponse) = sender(query, response=None, useQueue=False)
318 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
319
320 name = 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
321 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
322 expectedResponse = dns.message.make_response(query)
323 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
324
6ca2e796
RG
325 for method in ("sendUDPQuery", "sendTCPQuery"):
326 sender = getattr(self, method)
327 (_, receivedResponse) = sender(query, response=None, useQueue=False)
328 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
329
330 name = 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
331 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
332 # dnsdist set RA = RD for spoofed responses
333 query.flags &= ~dns.flags.RD
334 expectedResponse = dns.message.make_response(query)
335 expectedResponse.answer.append(dns.rrset.from_text(name,
336 60,
337 dns.rdataclass.IN,
338 dns.rdatatype.A,
339 '192.0.2.1', '192.0.2.2'))
340
6ca2e796
RG
341 for method in ("sendUDPQuery", "sendTCPQuery"):
342 sender = getattr(self, method)
343 (_, receivedResponse) = sender(query, response=None, useQueue=False)
344 self.checkMessageNoEDNS(expectedResponse, receivedResponse)