]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
Merge pull request #7699 from chbruyand/recursor-protobuf-logging
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSSelfGenerated.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 from dnsdisttests import DNSDistTest
5 from datetime import datetime, timedelta
6
7 class TestEDNSSelfGenerated(DNSDistTest):
8 """
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
11 """
12
13 _config_template = """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
16
17 function luarule(dq)
18 return DNSAction.Nxdomain, ""
19 end
20
21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
22
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
24
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
26
27 newServer{address="127.0.0.1:%s"}
28 """
29
30 def testNoEDNS(self):
31 """
32 EDNS on Self-Generated: No existing EDNS
33 """
34 name = 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
36 expectedResponse = dns.message.make_response(query)
37 expectedResponse.set_rcode(dns.rcode.REFUSED)
38
39 for method in ("sendUDPQuery", "sendTCPQuery"):
40 sender = getattr(self, method)
41 (_, receivedResponse) = sender(query, response=None, useQueue=False)
42 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
43
44 name = 'no-edns.tc.edns-self.tests.powerdns.com.'
45 query = dns.message.make_query(name, 'A', 'IN')
46 expectedResponse = dns.message.make_response(query)
47 expectedResponse.flags |= dns.flags.TC
48
49 for method in ("sendUDPQuery", "sendTCPQuery"):
50 sender = getattr(self, method)
51 (_, receivedResponse) = sender(query, response=None, useQueue=False)
52 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
53
54 name = 'no-edns.lua.edns-self.tests.powerdns.com.'
55 query = dns.message.make_query(name, 'A', 'IN')
56 expectedResponse = dns.message.make_response(query)
57 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
58
59 for method in ("sendUDPQuery", "sendTCPQuery"):
60 sender = getattr(self, method)
61 (_, receivedResponse) = sender(query, response=None, useQueue=False)
62 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
63
64 name = 'no-edns.spoof.edns-self.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN')
66 # dnsdist set RA = RD for spoofed responses
67 query.flags &= ~dns.flags.RD
68 expectedResponse = dns.message.make_response(query, our_payload=1042)
69 expectedResponse.answer.append(dns.rrset.from_text(name,
70 60,
71 dns.rdataclass.IN,
72 dns.rdatatype.A,
73 '192.0.2.1', '192.0.2.2'))
74
75 for method in ("sendUDPQuery", "sendTCPQuery"):
76 sender = getattr(self, method)
77 (_, receivedResponse) = sender(query, response=None, useQueue=False)
78 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
79
80 def testWithEDNSNoDO(self):
81 """
82 EDNS on Self-Generated: EDNS with DO=0
83 """
84 name = 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
85 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
86 expectedResponse = dns.message.make_response(query, our_payload=1042)
87 expectedResponse.set_rcode(dns.rcode.REFUSED)
88
89 for method in ("sendUDPQuery", "sendTCPQuery"):
90 sender = getattr(self, method)
91 (_, receivedResponse) = sender(query, response=None, useQueue=False)
92 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
93 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
94 self.assertEquals(receivedResponse.payload, 1042)
95
96 name = 'edns-no-do.tc.edns-self.tests.powerdns.com.'
97 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
98 expectedResponse = dns.message.make_response(query, our_payload=1042)
99 expectedResponse.flags |= dns.flags.TC
100
101 for method in ("sendUDPQuery", "sendTCPQuery"):
102 sender = getattr(self, method)
103 (_, receivedResponse) = sender(query, response=None, useQueue=False)
104 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
105 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
106 self.assertEquals(receivedResponse.payload, 1042)
107
108 name = 'edns-no-do.lua.edns-self.tests.powerdns.com.'
109 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
110 expectedResponse = dns.message.make_response(query, our_payload=1042)
111 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
112
113 for method in ("sendUDPQuery", "sendTCPQuery"):
114 sender = getattr(self, method)
115 (_, receivedResponse) = sender(query, response=None, useQueue=False)
116 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
117 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
118 self.assertEquals(receivedResponse.payload, 1042)
119
120 name = 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
121 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
122 # dnsdist set RA = RD for spoofed responses
123 query.flags &= ~dns.flags.RD
124 expectedResponse = dns.message.make_response(query, our_payload=1042)
125 expectedResponse.answer.append(dns.rrset.from_text(name,
126 60,
127 dns.rdataclass.IN,
128 dns.rdatatype.A,
129 '192.0.2.1', '192.0.2.2'))
130
131 for method in ("sendUDPQuery", "sendTCPQuery"):
132 sender = getattr(self, method)
133 (_, receivedResponse) = sender(query, response=None, useQueue=False)
134 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
135 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
136 self.assertEquals(receivedResponse.payload, 1042)
137
138 def testWithEDNSWithDO(self):
139 """
140 EDNS on Self-Generated: EDNS with DO=1
141 """
142 name = 'edns-do.rcode.edns-self.tests.powerdns.com.'
143 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
144 expectedResponse = dns.message.make_response(query, our_payload=1042)
145 expectedResponse.set_rcode(dns.rcode.REFUSED)
146
147 for method in ("sendUDPQuery", "sendTCPQuery"):
148 sender = getattr(self, method)
149 (_, receivedResponse) = sender(query, response=None, useQueue=False)
150 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
151 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
152 self.assertEquals(receivedResponse.payload, 1042)
153
154 name = 'edns-do.tc.edns-self.tests.powerdns.com.'
155 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
156 expectedResponse = dns.message.make_response(query, our_payload=1042)
157 expectedResponse.flags |= dns.flags.TC
158
159 for method in ("sendUDPQuery", "sendTCPQuery"):
160 sender = getattr(self, method)
161 (_, receivedResponse) = sender(query, response=None, useQueue=False)
162 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
163 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
164 self.assertEquals(receivedResponse.payload, 1042)
165
166 name = 'edns-do.lua.edns-self.tests.powerdns.com.'
167 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
168 expectedResponse = dns.message.make_response(query, our_payload=1042)
169 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
170
171 for method in ("sendUDPQuery", "sendTCPQuery"):
172 sender = getattr(self, method)
173 (_, receivedResponse) = sender(query, response=None, useQueue=False)
174 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
175 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
176 self.assertEquals(receivedResponse.payload, 1042)
177
178 name = 'edns-do.spoof.edns-self.tests.powerdns.com.'
179 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
180 # dnsdist set RA = RD for spoofed responses
181 query.flags &= ~dns.flags.RD
182 expectedResponse = dns.message.make_response(query, our_payload=1042)
183 expectedResponse.answer.append(dns.rrset.from_text(name,
184 60,
185 dns.rdataclass.IN,
186 dns.rdatatype.A,
187 '192.0.2.1', '192.0.2.2'))
188
189 for method in ("sendUDPQuery", "sendTCPQuery"):
190 sender = getattr(self, method)
191 (_, receivedResponse) = sender(query, response=None, useQueue=False)
192 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
193 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
194 self.assertEquals(receivedResponse.payload, 1042)
195
196 def testWithEDNSNoOptions(self):
197 """
198 EDNS on Self-Generated: EDNS with options in the query
199 """
200 name = 'edns-options.rcode.edns-self.tests.powerdns.com.'
201 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
202 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
203 expectedResponse = dns.message.make_response(query, our_payload=1042)
204 expectedResponse.set_rcode(dns.rcode.REFUSED)
205
206 for method in ("sendUDPQuery", "sendTCPQuery"):
207 sender = getattr(self, method)
208 (_, receivedResponse) = sender(query, response=None, useQueue=False)
209 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
210 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
211 self.assertEquals(receivedResponse.payload, 1042)
212
213 name = 'edns-options.tc.edns-self.tests.powerdns.com.'
214 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
215 expectedResponse = dns.message.make_response(query, our_payload=1042)
216 expectedResponse.flags |= dns.flags.TC
217
218 for method in ("sendUDPQuery", "sendTCPQuery"):
219 sender = getattr(self, method)
220 (_, receivedResponse) = sender(query, response=None, useQueue=False)
221 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
222 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
223 self.assertEquals(receivedResponse.payload, 1042)
224
225 name = 'edns-options.lua.edns-self.tests.powerdns.com.'
226 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
227 expectedResponse = dns.message.make_response(query, our_payload=1042)
228 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
229
230 for method in ("sendUDPQuery", "sendTCPQuery"):
231 sender = getattr(self, method)
232 (_, receivedResponse) = sender(query, response=None, useQueue=False)
233 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
234 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
235 self.assertEquals(receivedResponse.payload, 1042)
236
237 name = 'edns-options.spoof.edns-self.tests.powerdns.com.'
238 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
239 # dnsdist set RA = RD for spoofed responses
240 query.flags &= ~dns.flags.RD
241 expectedResponse = dns.message.make_response(query, our_payload=1042)
242 expectedResponse.answer.append(dns.rrset.from_text(name,
243 60,
244 dns.rdataclass.IN,
245 dns.rdatatype.A,
246 '192.0.2.1', '192.0.2.2'))
247
248 for method in ("sendUDPQuery", "sendTCPQuery"):
249 sender = getattr(self, method)
250 (_, receivedResponse) = sender(query, response=None, useQueue=False)
251 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
252 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
253 self.assertEquals(receivedResponse.payload, 1042)
254
255
256 class TestEDNSSelfGeneratedDisabled(DNSDistTest):
257 """
258 Check that dnsdist does not send EDNS data on
259 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
260 """
261
262 _config_template = """
263 setAddEDNSToSelfGeneratedResponses(false)
264
265 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
266 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
267
268 function luarule(dq)
269 return DNSAction.Nxdomain, ""
270 end
271
272 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
273
274 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
275
276 setPayloadSizeOnSelfGeneratedAnswers(1042)
277
278 newServer{address="127.0.0.1:%s"}
279 """
280
281 def testWithEDNSNoDO(self):
282 """
283 EDNS on Self-Generated (disabled): EDNS with DO=0
284 """
285 name = 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
286 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
287 expectedResponse = dns.message.make_response(query)
288 expectedResponse.set_rcode(dns.rcode.REFUSED)
289
290 for method in ("sendUDPQuery", "sendTCPQuery"):
291 sender = getattr(self, method)
292 (_, receivedResponse) = sender(query, response=None, useQueue=False)
293 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
294
295 name = 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
296 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
297 expectedResponse = dns.message.make_response(query)
298 expectedResponse.flags |= dns.flags.TC
299
300 for method in ("sendUDPQuery", "sendTCPQuery"):
301 sender = getattr(self, method)
302 (_, receivedResponse) = sender(query, response=None, useQueue=False)
303 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
304
305 name = 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
306 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
307 expectedResponse = dns.message.make_response(query)
308 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
309
310 for method in ("sendUDPQuery", "sendTCPQuery"):
311 sender = getattr(self, method)
312 (_, receivedResponse) = sender(query, response=None, useQueue=False)
313 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
314
315 name = 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
316 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
317 # dnsdist set RA = RD for spoofed responses
318 query.flags &= ~dns.flags.RD
319 expectedResponse = dns.message.make_response(query)
320 expectedResponse.answer.append(dns.rrset.from_text(name,
321 60,
322 dns.rdataclass.IN,
323 dns.rdatatype.A,
324 '192.0.2.1', '192.0.2.2'))
325
326 for method in ("sendUDPQuery", "sendTCPQuery"):
327 sender = getattr(self, method)
328 (_, receivedResponse) = sender(query, response=None, useQueue=False)
329 self.checkMessageNoEDNS(expectedResponse, receivedResponse)