]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_EDNSSelfGenerated.py
Merge pull request #8141 from rgacogne/dnsdist-ocsp
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSSelfGenerated.py
CommitLineData
e7c732b8
RG
1#!/usr/bin/env python
2import dns
3import clientsubnetoption
4from dnsdisttests import DNSDistTest
5from datetime import datetime, timedelta
6
7class TestEDNSSelfGenerated(DNSDistTest):
8 """
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
11 """
12
13 _config_template = """
d3ec24f9 14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
e7c732b8
RG
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
16
17 function luarule(dq)
18 return DNSAction.Nxdomain, ""
19 end
20
955de53b 21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
e7c732b8
RG
22
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
24
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
26
27 newServer{address="127.0.0.1:%s"}
28 """
29
30 def testNoEDNS(self):
31 """
32 EDNS on Self-Generated: No existing EDNS
33 """
34 name = 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
36 expectedResponse = dns.message.make_response(query)
37 expectedResponse.set_rcode(dns.rcode.REFUSED)
38
6ca2e796
RG
39 for method in ("sendUDPQuery", "sendTCPQuery"):
40 sender = getattr(self, method)
41 (_, receivedResponse) = sender(query, response=None, useQueue=False)
42 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
43
44 name = 'no-edns.tc.edns-self.tests.powerdns.com.'
45 query = dns.message.make_query(name, 'A', 'IN')
46 expectedResponse = dns.message.make_response(query)
47 expectedResponse.flags |= dns.flags.TC
48
6ca2e796
RG
49 for method in ("sendUDPQuery", "sendTCPQuery"):
50 sender = getattr(self, method)
51 (_, receivedResponse) = sender(query, response=None, useQueue=False)
52 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
53
54 name = 'no-edns.lua.edns-self.tests.powerdns.com.'
55 query = dns.message.make_query(name, 'A', 'IN')
56 expectedResponse = dns.message.make_response(query)
57 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
58
6ca2e796
RG
59 for method in ("sendUDPQuery", "sendTCPQuery"):
60 sender = getattr(self, method)
61 (_, receivedResponse) = sender(query, response=None, useQueue=False)
62 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
63
64 name = 'no-edns.spoof.edns-self.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN')
66 # dnsdist set RA = RD for spoofed responses
67 query.flags &= ~dns.flags.RD
d70d5ad3 68 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
69 expectedResponse.answer.append(dns.rrset.from_text(name,
70 60,
71 dns.rdataclass.IN,
72 dns.rdatatype.A,
73 '192.0.2.1', '192.0.2.2'))
74
6ca2e796
RG
75 for method in ("sendUDPQuery", "sendTCPQuery"):
76 sender = getattr(self, method)
77 (_, receivedResponse) = sender(query, response=None, useQueue=False)
78 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
79
80 def testWithEDNSNoDO(self):
81 """
82 EDNS on Self-Generated: EDNS with DO=0
83 """
84 name = 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
85 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
d70d5ad3 86 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
87 expectedResponse.set_rcode(dns.rcode.REFUSED)
88
6ca2e796
RG
89 for method in ("sendUDPQuery", "sendTCPQuery"):
90 sender = getattr(self, method)
91 (_, receivedResponse) = sender(query, response=None, useQueue=False)
92 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
93 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
94 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
95
96 name = 'edns-no-do.tc.edns-self.tests.powerdns.com.'
97 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
d70d5ad3 98 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
99 expectedResponse.flags |= dns.flags.TC
100
6ca2e796
RG
101 for method in ("sendUDPQuery", "sendTCPQuery"):
102 sender = getattr(self, method)
103 (_, receivedResponse) = sender(query, response=None, useQueue=False)
104 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
105 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
106 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
107
108 name = 'edns-no-do.lua.edns-self.tests.powerdns.com.'
109 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
d70d5ad3 110 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
111 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
112
6ca2e796
RG
113 for method in ("sendUDPQuery", "sendTCPQuery"):
114 sender = getattr(self, method)
115 (_, receivedResponse) = sender(query, response=None, useQueue=False)
116 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
117 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
118 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
119
120 name = 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
121 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
122 # dnsdist set RA = RD for spoofed responses
123 query.flags &= ~dns.flags.RD
d70d5ad3 124 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
125 expectedResponse.answer.append(dns.rrset.from_text(name,
126 60,
127 dns.rdataclass.IN,
128 dns.rdatatype.A,
129 '192.0.2.1', '192.0.2.2'))
130
6ca2e796
RG
131 for method in ("sendUDPQuery", "sendTCPQuery"):
132 sender = getattr(self, method)
133 (_, receivedResponse) = sender(query, response=None, useQueue=False)
134 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
135 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
136 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
137
138 def testWithEDNSWithDO(self):
139 """
140 EDNS on Self-Generated: EDNS with DO=1
141 """
142 name = 'edns-do.rcode.edns-self.tests.powerdns.com.'
143 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
d70d5ad3 144 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
145 expectedResponse.set_rcode(dns.rcode.REFUSED)
146
6ca2e796
RG
147 for method in ("sendUDPQuery", "sendTCPQuery"):
148 sender = getattr(self, method)
149 (_, receivedResponse) = sender(query, response=None, useQueue=False)
150 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
151 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
152 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
153
154 name = 'edns-do.tc.edns-self.tests.powerdns.com.'
155 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
d70d5ad3 156 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
157 expectedResponse.flags |= dns.flags.TC
158
6ca2e796
RG
159 for method in ("sendUDPQuery", "sendTCPQuery"):
160 sender = getattr(self, method)
161 (_, receivedResponse) = sender(query, response=None, useQueue=False)
162 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
163 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
164 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
165
166 name = 'edns-do.lua.edns-self.tests.powerdns.com.'
167 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
d70d5ad3 168 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
169 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
170
6ca2e796
RG
171 for method in ("sendUDPQuery", "sendTCPQuery"):
172 sender = getattr(self, method)
173 (_, receivedResponse) = sender(query, response=None, useQueue=False)
174 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
175 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
176 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
177
178 name = 'edns-do.spoof.edns-self.tests.powerdns.com.'
179 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
180 # dnsdist set RA = RD for spoofed responses
181 query.flags &= ~dns.flags.RD
d70d5ad3 182 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
183 expectedResponse.answer.append(dns.rrset.from_text(name,
184 60,
185 dns.rdataclass.IN,
186 dns.rdatatype.A,
187 '192.0.2.1', '192.0.2.2'))
188
6ca2e796
RG
189 for method in ("sendUDPQuery", "sendTCPQuery"):
190 sender = getattr(self, method)
191 (_, receivedResponse) = sender(query, response=None, useQueue=False)
192 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
193 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
194 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
195
196 def testWithEDNSNoOptions(self):
197 """
198 EDNS on Self-Generated: EDNS with options in the query
199 """
200 name = 'edns-options.rcode.edns-self.tests.powerdns.com.'
201 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
202 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
d70d5ad3 203 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
204 expectedResponse.set_rcode(dns.rcode.REFUSED)
205
6ca2e796
RG
206 for method in ("sendUDPQuery", "sendTCPQuery"):
207 sender = getattr(self, method)
208 (_, receivedResponse) = sender(query, response=None, useQueue=False)
209 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
210 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
211 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
212
213 name = 'edns-options.tc.edns-self.tests.powerdns.com.'
214 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
d70d5ad3 215 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
216 expectedResponse.flags |= dns.flags.TC
217
6ca2e796
RG
218 for method in ("sendUDPQuery", "sendTCPQuery"):
219 sender = getattr(self, method)
220 (_, receivedResponse) = sender(query, response=None, useQueue=False)
221 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
222 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
223 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
224
225 name = 'edns-options.lua.edns-self.tests.powerdns.com.'
226 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
d70d5ad3 227 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
228 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
229
6ca2e796
RG
230 for method in ("sendUDPQuery", "sendTCPQuery"):
231 sender = getattr(self, method)
232 (_, receivedResponse) = sender(query, response=None, useQueue=False)
233 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
234 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
235 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
236
237 name = 'edns-options.spoof.edns-self.tests.powerdns.com.'
238 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
239 # dnsdist set RA = RD for spoofed responses
240 query.flags &= ~dns.flags.RD
d70d5ad3 241 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
242 expectedResponse.answer.append(dns.rrset.from_text(name,
243 60,
244 dns.rdataclass.IN,
245 dns.rdatatype.A,
246 '192.0.2.1', '192.0.2.2'))
247
6ca2e796
RG
248 for method in ("sendUDPQuery", "sendTCPQuery"):
249 sender = getattr(self, method)
250 (_, receivedResponse) = sender(query, response=None, useQueue=False)
251 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
252 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
253 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
254
255
256class TestEDNSSelfGeneratedDisabled(DNSDistTest):
257 """
258 Check that dnsdist does not send EDNS data on
259 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
260 """
261
262 _config_template = """
263 setAddEDNSToSelfGeneratedResponses(false)
264
d3ec24f9 265 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
e7c732b8
RG
266 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
267
268 function luarule(dq)
269 return DNSAction.Nxdomain, ""
270 end
271
955de53b 272 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
e7c732b8
RG
273
274 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
275
276 setPayloadSizeOnSelfGeneratedAnswers(1042)
277
278 newServer{address="127.0.0.1:%s"}
279 """
280
281 def testWithEDNSNoDO(self):
282 """
283 EDNS on Self-Generated (disabled): EDNS with DO=0
284 """
285 name = 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
286 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
287 expectedResponse = dns.message.make_response(query)
288 expectedResponse.set_rcode(dns.rcode.REFUSED)
289
6ca2e796
RG
290 for method in ("sendUDPQuery", "sendTCPQuery"):
291 sender = getattr(self, method)
292 (_, receivedResponse) = sender(query, response=None, useQueue=False)
293 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
294
295 name = 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
296 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
297 expectedResponse = dns.message.make_response(query)
298 expectedResponse.flags |= dns.flags.TC
299
6ca2e796
RG
300 for method in ("sendUDPQuery", "sendTCPQuery"):
301 sender = getattr(self, method)
302 (_, receivedResponse) = sender(query, response=None, useQueue=False)
303 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
304
305 name = 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
306 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
307 expectedResponse = dns.message.make_response(query)
308 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
309
6ca2e796
RG
310 for method in ("sendUDPQuery", "sendTCPQuery"):
311 sender = getattr(self, method)
312 (_, receivedResponse) = sender(query, response=None, useQueue=False)
313 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
314
315 name = 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
316 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
317 # dnsdist set RA = RD for spoofed responses
318 query.flags &= ~dns.flags.RD
319 expectedResponse = dns.message.make_response(query)
320 expectedResponse.answer.append(dns.rrset.from_text(name,
321 60,
322 dns.rdataclass.IN,
323 dns.rdatatype.A,
324 '192.0.2.1', '192.0.2.2'))
325
6ca2e796
RG
326 for method in ("sendUDPQuery", "sendTCPQuery"):
327 sender = getattr(self, method)
328 (_, receivedResponse) = sender(query, response=None, useQueue=False)
329 self.checkMessageNoEDNS(expectedResponse, receivedResponse)