]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_EDNSSelfGenerated.py
dnsdist: Support setting the value of AA, AD and RA when spoofing
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSSelfGenerated.py
CommitLineData
e7c732b8
RG
1#!/usr/bin/env python
2import dns
3import clientsubnetoption
4from dnsdisttests import DNSDistTest
5from datetime import datetime, timedelta
6
7class TestEDNSSelfGenerated(DNSDistTest):
8 """
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
11 """
12
13 _config_template = """
d3ec24f9 14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
e7c732b8
RG
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
16
17 function luarule(dq)
18 return DNSAction.Nxdomain, ""
19 end
20
955de53b 21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
e7c732b8
RG
22
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
24
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
26
27 newServer{address="127.0.0.1:%s"}
28 """
29
30 def testNoEDNS(self):
31 """
32 EDNS on Self-Generated: No existing EDNS
33 """
34 name = 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
36 expectedResponse = dns.message.make_response(query)
37 expectedResponse.set_rcode(dns.rcode.REFUSED)
38
6ca2e796
RG
39 for method in ("sendUDPQuery", "sendTCPQuery"):
40 sender = getattr(self, method)
41 (_, receivedResponse) = sender(query, response=None, useQueue=False)
42 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
43
44 name = 'no-edns.tc.edns-self.tests.powerdns.com.'
45 query = dns.message.make_query(name, 'A', 'IN')
955b9377
RG
46 # dnsdist sets RA = RD for TC responses
47 query.flags &= ~dns.flags.RD
e7c732b8
RG
48 expectedResponse = dns.message.make_response(query)
49 expectedResponse.flags |= dns.flags.TC
50
6ca2e796
RG
51 for method in ("sendUDPQuery", "sendTCPQuery"):
52 sender = getattr(self, method)
53 (_, receivedResponse) = sender(query, response=None, useQueue=False)
54 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
55
56 name = 'no-edns.lua.edns-self.tests.powerdns.com.'
57 query = dns.message.make_query(name, 'A', 'IN')
58 expectedResponse = dns.message.make_response(query)
59 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
60
6ca2e796
RG
61 for method in ("sendUDPQuery", "sendTCPQuery"):
62 sender = getattr(self, method)
63 (_, receivedResponse) = sender(query, response=None, useQueue=False)
64 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
65
66 name = 'no-edns.spoof.edns-self.tests.powerdns.com.'
67 query = dns.message.make_query(name, 'A', 'IN')
68 # dnsdist set RA = RD for spoofed responses
69 query.flags &= ~dns.flags.RD
d70d5ad3 70 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
71 expectedResponse.answer.append(dns.rrset.from_text(name,
72 60,
73 dns.rdataclass.IN,
74 dns.rdatatype.A,
75 '192.0.2.1', '192.0.2.2'))
76
6ca2e796
RG
77 for method in ("sendUDPQuery", "sendTCPQuery"):
78 sender = getattr(self, method)
79 (_, receivedResponse) = sender(query, response=None, useQueue=False)
80 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
81
82 def testWithEDNSNoDO(self):
83 """
84 EDNS on Self-Generated: EDNS with DO=0
85 """
86 name = 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
87 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
d70d5ad3 88 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
89 expectedResponse.set_rcode(dns.rcode.REFUSED)
90
6ca2e796
RG
91 for method in ("sendUDPQuery", "sendTCPQuery"):
92 sender = getattr(self, method)
93 (_, receivedResponse) = sender(query, response=None, useQueue=False)
94 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
95 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
96 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
97
98 name = 'edns-no-do.tc.edns-self.tests.powerdns.com.'
99 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
955b9377
RG
100 # dnsdist sets RA = RD for TC responses
101 query.flags &= ~dns.flags.RD
d70d5ad3 102 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
103 expectedResponse.flags |= dns.flags.TC
104
6ca2e796
RG
105 for method in ("sendUDPQuery", "sendTCPQuery"):
106 sender = getattr(self, method)
107 (_, receivedResponse) = sender(query, response=None, useQueue=False)
108 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
109 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
110 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
111
112 name = 'edns-no-do.lua.edns-self.tests.powerdns.com.'
113 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
d70d5ad3 114 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
115 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
116
6ca2e796
RG
117 for method in ("sendUDPQuery", "sendTCPQuery"):
118 sender = getattr(self, method)
119 (_, receivedResponse) = sender(query, response=None, useQueue=False)
120 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
121 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
122 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
123
124 name = 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
125 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
126 # dnsdist set RA = RD for spoofed responses
127 query.flags &= ~dns.flags.RD
d70d5ad3 128 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
129 expectedResponse.answer.append(dns.rrset.from_text(name,
130 60,
131 dns.rdataclass.IN,
132 dns.rdatatype.A,
133 '192.0.2.1', '192.0.2.2'))
134
6ca2e796
RG
135 for method in ("sendUDPQuery", "sendTCPQuery"):
136 sender = getattr(self, method)
137 (_, receivedResponse) = sender(query, response=None, useQueue=False)
138 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
139 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
140 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
141
142 def testWithEDNSWithDO(self):
143 """
144 EDNS on Self-Generated: EDNS with DO=1
145 """
146 name = 'edns-do.rcode.edns-self.tests.powerdns.com.'
147 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
d70d5ad3 148 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
149 expectedResponse.set_rcode(dns.rcode.REFUSED)
150
6ca2e796
RG
151 for method in ("sendUDPQuery", "sendTCPQuery"):
152 sender = getattr(self, method)
153 (_, receivedResponse) = sender(query, response=None, useQueue=False)
154 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
155 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
156 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
157
158 name = 'edns-do.tc.edns-self.tests.powerdns.com.'
159 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
955b9377
RG
160 # dnsdist sets RA = RD for TC responses
161 query.flags &= ~dns.flags.RD
d70d5ad3 162 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
163 expectedResponse.flags |= dns.flags.TC
164
6ca2e796
RG
165 for method in ("sendUDPQuery", "sendTCPQuery"):
166 sender = getattr(self, method)
167 (_, receivedResponse) = sender(query, response=None, useQueue=False)
168 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
169 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
170 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
171
172 name = 'edns-do.lua.edns-self.tests.powerdns.com.'
173 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
d70d5ad3 174 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
175 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
176
6ca2e796
RG
177 for method in ("sendUDPQuery", "sendTCPQuery"):
178 sender = getattr(self, method)
179 (_, receivedResponse) = sender(query, response=None, useQueue=False)
180 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
181 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
182 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
183
184 name = 'edns-do.spoof.edns-self.tests.powerdns.com.'
185 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
186 # dnsdist set RA = RD for spoofed responses
187 query.flags &= ~dns.flags.RD
d70d5ad3 188 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
189 expectedResponse.answer.append(dns.rrset.from_text(name,
190 60,
191 dns.rdataclass.IN,
192 dns.rdatatype.A,
193 '192.0.2.1', '192.0.2.2'))
194
6ca2e796
RG
195 for method in ("sendUDPQuery", "sendTCPQuery"):
196 sender = getattr(self, method)
197 (_, receivedResponse) = sender(query, response=None, useQueue=False)
198 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
199 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
200 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
201
202 def testWithEDNSNoOptions(self):
203 """
204 EDNS on Self-Generated: EDNS with options in the query
205 """
206 name = 'edns-options.rcode.edns-self.tests.powerdns.com.'
207 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
208 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
d70d5ad3 209 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
210 expectedResponse.set_rcode(dns.rcode.REFUSED)
211
6ca2e796
RG
212 for method in ("sendUDPQuery", "sendTCPQuery"):
213 sender = getattr(self, method)
214 (_, receivedResponse) = sender(query, response=None, useQueue=False)
215 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
216 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
217 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
218
219 name = 'edns-options.tc.edns-self.tests.powerdns.com.'
220 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
955b9377
RG
221 # dnsdist sets RA = RD for TC responses
222 query.flags &= ~dns.flags.RD
d70d5ad3 223 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
224 expectedResponse.flags |= dns.flags.TC
225
6ca2e796
RG
226 for method in ("sendUDPQuery", "sendTCPQuery"):
227 sender = getattr(self, method)
228 (_, receivedResponse) = sender(query, response=None, useQueue=False)
229 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
230 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
231 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
232
233 name = 'edns-options.lua.edns-self.tests.powerdns.com.'
234 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
d70d5ad3 235 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
236 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
237
6ca2e796
RG
238 for method in ("sendUDPQuery", "sendTCPQuery"):
239 sender = getattr(self, method)
240 (_, receivedResponse) = sender(query, response=None, useQueue=False)
241 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
242 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
243 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
244
245 name = 'edns-options.spoof.edns-self.tests.powerdns.com.'
246 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
247 # dnsdist set RA = RD for spoofed responses
248 query.flags &= ~dns.flags.RD
d70d5ad3 249 expectedResponse = dns.message.make_response(query, our_payload=1042)
e7c732b8
RG
250 expectedResponse.answer.append(dns.rrset.from_text(name,
251 60,
252 dns.rdataclass.IN,
253 dns.rdatatype.A,
254 '192.0.2.1', '192.0.2.2'))
255
6ca2e796
RG
256 for method in ("sendUDPQuery", "sendTCPQuery"):
257 sender = getattr(self, method)
258 (_, receivedResponse) = sender(query, response=None, useQueue=False)
259 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
260 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
261 self.assertEquals(receivedResponse.payload, 1042)
e7c732b8
RG
262
263
264class TestEDNSSelfGeneratedDisabled(DNSDistTest):
265 """
266 Check that dnsdist does not send EDNS data on
267 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
268 """
269
270 _config_template = """
271 setAddEDNSToSelfGeneratedResponses(false)
272
d3ec24f9 273 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
e7c732b8
RG
274 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
275
276 function luarule(dq)
277 return DNSAction.Nxdomain, ""
278 end
279
955de53b 280 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
e7c732b8
RG
281
282 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
283
284 setPayloadSizeOnSelfGeneratedAnswers(1042)
285
286 newServer{address="127.0.0.1:%s"}
287 """
288
289 def testWithEDNSNoDO(self):
290 """
291 EDNS on Self-Generated (disabled): EDNS with DO=0
292 """
293 name = 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
294 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
295 expectedResponse = dns.message.make_response(query)
296 expectedResponse.set_rcode(dns.rcode.REFUSED)
297
6ca2e796
RG
298 for method in ("sendUDPQuery", "sendTCPQuery"):
299 sender = getattr(self, method)
300 (_, receivedResponse) = sender(query, response=None, useQueue=False)
301 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
302
303 name = 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
304 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
955b9377
RG
305 # dnsdist sets RA = RD for TC responses
306 query.flags &= ~dns.flags.RD
e7c732b8
RG
307 expectedResponse = dns.message.make_response(query)
308 expectedResponse.flags |= dns.flags.TC
309
6ca2e796
RG
310 for method in ("sendUDPQuery", "sendTCPQuery"):
311 sender = getattr(self, method)
312 (_, receivedResponse) = sender(query, response=None, useQueue=False)
313 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
314
315 name = 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
316 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
317 expectedResponse = dns.message.make_response(query)
318 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
319
6ca2e796
RG
320 for method in ("sendUDPQuery", "sendTCPQuery"):
321 sender = getattr(self, method)
322 (_, receivedResponse) = sender(query, response=None, useQueue=False)
323 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
e7c732b8
RG
324
325 name = 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
326 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
327 # dnsdist set RA = RD for spoofed responses
328 query.flags &= ~dns.flags.RD
329 expectedResponse = dns.message.make_response(query)
330 expectedResponse.answer.append(dns.rrset.from_text(name,
331 60,
332 dns.rdataclass.IN,
333 dns.rdatatype.A,
334 '192.0.2.1', '192.0.2.2'))
335
6ca2e796
RG
336 for method in ("sendUDPQuery", "sendTCPQuery"):
337 sender = getattr(self, method)
338 (_, receivedResponse) = sender(query, response=None, useQueue=False)
339 self.checkMessageNoEDNS(expectedResponse, receivedResponse)