]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_EDNSSelfGenerated.py
Merge pull request #14078 from rgacogne/ddist-harvest-quic
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSSelfGenerated.py
CommitLineData
e7c732b8
RG
1#!/usr/bin/env python
2import dns
3import clientsubnetoption
4from dnsdisttests import DNSDistTest
5from datetime import datetime, timedelta
6
7class TestEDNSSelfGenerated(DNSDistTest):
8 """
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
11 """
12
13 _config_template = """
d3ec24f9 14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
e7c732b8
RG
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
16
17 function luarule(dq)
18 return DNSAction.Nxdomain, ""
19 end
20
955de53b 21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
e7c732b8
RG
22
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
24
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
26
27 newServer{address="127.0.0.1:%s"}
28 """
29
30 def testNoEDNS(self):
31 """
32 EDNS on Self-Generated: No existing EDNS
33 """
34 name = 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
7af22479 36 query.flags &= ~dns.flags.RD
e7c732b8
RG
37 expectedResponse = dns.message.make_response(query)
38 expectedResponse.set_rcode(dns.rcode.REFUSED)
39
6ca2e796
RG
40 for method in ("sendUDPQuery", "sendTCPQuery"):
41 sender = getattr(self, method)
42 (_, receivedResponse) = sender(query, response=None, useQueue=False)
43 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
44
45 name = 'no-edns.tc.edns-self.tests.powerdns.com.'
46 query = dns.message.make_query(name, 'A', 'IN')
955b9377
RG
47 # dnsdist sets RA = RD for TC responses
48 query.flags &= ~dns.flags.RD
e7c732b8
RG
49 expectedResponse = dns.message.make_response(query)
50 expectedResponse.flags |= dns.flags.TC
51
0ea62cf0
RG
52 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
53 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
54
55 name = 'no-edns.lua.edns-self.tests.powerdns.com.'
56 query = dns.message.make_query(name, 'A', 'IN')
57 expectedResponse = dns.message.make_response(query)
58 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
59
6ca2e796
RG
60 for method in ("sendUDPQuery", "sendTCPQuery"):
61 sender = getattr(self, method)
62 (_, receivedResponse) = sender(query, response=None, useQueue=False)
63 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
64
65 name = 'no-edns.spoof.edns-self.tests.powerdns.com.'
66 query = dns.message.make_query(name, 'A', 'IN')
67 # dnsdist set RA = RD for spoofed responses
68 query.flags &= ~dns.flags.RD
d70d5ad3 69 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
70 expectedResponse.answer.append(dns.rrset.from_text(name,
71 60,
72 dns.rdataclass.IN,
73 dns.rdatatype.A,
74 '192.0.2.1', '192.0.2.2'))
75
6ca2e796
RG
76 for method in ("sendUDPQuery", "sendTCPQuery"):
77 sender = getattr(self, method)
78 (_, receivedResponse) = sender(query, response=None, useQueue=False)
79 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
80
81 def testWithEDNSNoDO(self):
82 """
83 EDNS on Self-Generated: EDNS with DO=0
84 """
85 name = 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
86 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
7af22479 87 query.flags &= ~dns.flags.RD
d70d5ad3 88 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
89 expectedResponse.set_rcode(dns.rcode.REFUSED)
90
6ca2e796
RG
91 for method in ("sendUDPQuery", "sendTCPQuery"):
92 sender = getattr(self, method)
93 (_, receivedResponse) = sender(query, response=None, useQueue=False)
94 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
95 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
4bfebc93 96 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
97
98 name = 'edns-no-do.tc.edns-self.tests.powerdns.com.'
99 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
955b9377
RG
100 # dnsdist sets RA = RD for TC responses
101 query.flags &= ~dns.flags.RD
d70d5ad3 102 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
103 expectedResponse.flags |= dns.flags.TC
104
0ea62cf0
RG
105 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
106 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
107 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
108 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
109
110 name = 'edns-no-do.lua.edns-self.tests.powerdns.com.'
111 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
d70d5ad3 112 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
113 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
114
6ca2e796
RG
115 for method in ("sendUDPQuery", "sendTCPQuery"):
116 sender = getattr(self, method)
117 (_, receivedResponse) = sender(query, response=None, useQueue=False)
118 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
119 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
4bfebc93 120 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
121
122 name = 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
123 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
124 # dnsdist set RA = RD for spoofed responses
125 query.flags &= ~dns.flags.RD
d70d5ad3 126 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
127 expectedResponse.answer.append(dns.rrset.from_text(name,
128 60,
129 dns.rdataclass.IN,
130 dns.rdatatype.A,
131 '192.0.2.1', '192.0.2.2'))
132
6ca2e796
RG
133 for method in ("sendUDPQuery", "sendTCPQuery"):
134 sender = getattr(self, method)
135 (_, receivedResponse) = sender(query, response=None, useQueue=False)
136 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
137 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
4bfebc93 138 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
139
140 def testWithEDNSWithDO(self):
141 """
142 EDNS on Self-Generated: EDNS with DO=1
143 """
144 name = 'edns-do.rcode.edns-self.tests.powerdns.com.'
145 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
7af22479 146 query.flags &= ~dns.flags.RD
d70d5ad3 147 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
148 expectedResponse.set_rcode(dns.rcode.REFUSED)
149
6ca2e796
RG
150 for method in ("sendUDPQuery", "sendTCPQuery"):
151 sender = getattr(self, method)
152 (_, receivedResponse) = sender(query, response=None, useQueue=False)
153 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
154 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
4bfebc93 155 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
156
157 name = 'edns-do.tc.edns-self.tests.powerdns.com.'
158 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
955b9377
RG
159 # dnsdist sets RA = RD for TC responses
160 query.flags &= ~dns.flags.RD
d70d5ad3 161 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
162 expectedResponse.flags |= dns.flags.TC
163
0ea62cf0
RG
164 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
165 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
166 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
167 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
168
169 name = 'edns-do.lua.edns-self.tests.powerdns.com.'
170 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
d70d5ad3 171 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
172 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
173
6ca2e796
RG
174 for method in ("sendUDPQuery", "sendTCPQuery"):
175 sender = getattr(self, method)
176 (_, receivedResponse) = sender(query, response=None, useQueue=False)
177 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
178 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
4bfebc93 179 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
180
181 name = 'edns-do.spoof.edns-self.tests.powerdns.com.'
182 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
183 # dnsdist set RA = RD for spoofed responses
184 query.flags &= ~dns.flags.RD
d70d5ad3 185 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
186 expectedResponse.answer.append(dns.rrset.from_text(name,
187 60,
188 dns.rdataclass.IN,
189 dns.rdatatype.A,
190 '192.0.2.1', '192.0.2.2'))
191
6ca2e796
RG
192 for method in ("sendUDPQuery", "sendTCPQuery"):
193 sender = getattr(self, method)
194 (_, receivedResponse) = sender(query, response=None, useQueue=False)
195 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
196 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
4bfebc93 197 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
198
199 def testWithEDNSNoOptions(self):
200 """
201 EDNS on Self-Generated: EDNS with options in the query
202 """
203 name = 'edns-options.rcode.edns-self.tests.powerdns.com.'
204 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
205 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
7af22479 206 query.flags &= ~dns.flags.RD
d70d5ad3 207 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
208 expectedResponse.set_rcode(dns.rcode.REFUSED)
209
6ca2e796
RG
210 for method in ("sendUDPQuery", "sendTCPQuery"):
211 sender = getattr(self, method)
212 (_, receivedResponse) = sender(query, response=None, useQueue=False)
213 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
214 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
4bfebc93 215 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
216
217 name = 'edns-options.tc.edns-self.tests.powerdns.com.'
218 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
955b9377
RG
219 # dnsdist sets RA = RD for TC responses
220 query.flags &= ~dns.flags.RD
d70d5ad3 221 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
222 expectedResponse.flags |= dns.flags.TC
223
0ea62cf0
RG
224 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
225 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
226 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
227 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
228
229 name = 'edns-options.lua.edns-self.tests.powerdns.com.'
230 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
d70d5ad3 231 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
232 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
233
6ca2e796
RG
234 for method in ("sendUDPQuery", "sendTCPQuery"):
235 sender = getattr(self, method)
236 (_, receivedResponse) = sender(query, response=None, useQueue=False)
237 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
238 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
4bfebc93 239 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
240
241 name = 'edns-options.spoof.edns-self.tests.powerdns.com.'
242 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
243 # dnsdist set RA = RD for spoofed responses
244 query.flags &= ~dns.flags.RD
d70d5ad3 245 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
246 expectedResponse.answer.append(dns.rrset.from_text(name,
247 60,
248 dns.rdataclass.IN,
249 dns.rdatatype.A,
250 '192.0.2.1', '192.0.2.2'))
251
6ca2e796
RG
252 for method in ("sendUDPQuery", "sendTCPQuery"):
253 sender = getattr(self, method)
254 (_, receivedResponse) = sender(query, response=None, useQueue=False)
255 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
256 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
4bfebc93 257 self.assertEqual(receivedResponse.payload, 1042)
e7c732b8
RG
258
259
260class TestEDNSSelfGeneratedDisabled(DNSDistTest):
261 """
262 Check that dnsdist does not send EDNS data on
263 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
264 """
265
266 _config_template = """
267 setAddEDNSToSelfGeneratedResponses(false)
268
d3ec24f9 269 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
e7c732b8
RG
270 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
271
272 function luarule(dq)
273 return DNSAction.Nxdomain, ""
274 end
275
955de53b 276 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
e7c732b8
RG
277
278 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
279
280 setPayloadSizeOnSelfGeneratedAnswers(1042)
281
282 newServer{address="127.0.0.1:%s"}
283 """
284
285 def testWithEDNSNoDO(self):
286 """
287 EDNS on Self-Generated (disabled): EDNS with DO=0
288 """
289 name = 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
290 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
7af22479 291 query.flags &= ~dns.flags.RD
e7c732b8
RG
292 expectedResponse = dns.message.make_response(query)
293 expectedResponse.set_rcode(dns.rcode.REFUSED)
294
6ca2e796
RG
295 for method in ("sendUDPQuery", "sendTCPQuery"):
296 sender = getattr(self, method)
297 (_, receivedResponse) = sender(query, response=None, useQueue=False)
298 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
299
300 name = 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
301 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
955b9377
RG
302 # dnsdist sets RA = RD for TC responses
303 query.flags &= ~dns.flags.RD
e7c732b8
RG
304 expectedResponse = dns.message.make_response(query)
305 expectedResponse.flags |= dns.flags.TC
306
0ea62cf0
RG
307 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
308 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
309
310 name = 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
311 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
312 expectedResponse = dns.message.make_response(query)
313 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
314
6ca2e796
RG
315 for method in ("sendUDPQuery", "sendTCPQuery"):
316 sender = getattr(self, method)
317 (_, receivedResponse) = sender(query, response=None, useQueue=False)
318 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
319
320 name = 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
321 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
322 # dnsdist set RA = RD for spoofed responses
323 query.flags &= ~dns.flags.RD
324 expectedResponse = dns.message.make_response(query)
325 expectedResponse.answer.append(dns.rrset.from_text(name,
326 60,
327 dns.rdataclass.IN,
328 dns.rdatatype.A,
329 '192.0.2.1', '192.0.2.2'))
330
6ca2e796
RG
331 for method in ("sendUDPQuery", "sendTCPQuery"):
332 sender = getattr(self, method)
333 (_, receivedResponse) = sender(query, response=None, useQueue=False)
334 self.checkMessageNoEDNS(expectedResponse, receivedResponse)