]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
Merge pull request #6823 from klaus3000/load-ourserial-on-NOTIFY
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSSelfGenerated.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 from dnsdisttests import DNSDistTest
5 from datetime import datetime, timedelta
6
7 class TestEDNSSelfGenerated(DNSDistTest):
8 """
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
11 """
12
13 _config_template = """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(dnsdist.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
16
17 function luarule(dq)
18 return DNSAction.Nxdomain, ""
19 end
20
21 addLuaAction("lua.edns-self.tests.powerdns.com.", luarule)
22
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
24
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
26
27 newServer{address="127.0.0.1:%s"}
28 """
29
30 def testNoEDNS(self):
31 """
32 EDNS on Self-Generated: No existing EDNS
33 """
34 name = 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
36 expectedResponse = dns.message.make_response(query)
37 expectedResponse.set_rcode(dns.rcode.REFUSED)
38
39 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
40 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
41
42 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
43 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
44
45 name = 'no-edns.tc.edns-self.tests.powerdns.com.'
46 query = dns.message.make_query(name, 'A', 'IN')
47 expectedResponse = dns.message.make_response(query)
48 expectedResponse.flags |= dns.flags.TC
49
50 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
51 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
52
53 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
54 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
55
56 name = 'no-edns.lua.edns-self.tests.powerdns.com.'
57 query = dns.message.make_query(name, 'A', 'IN')
58 expectedResponse = dns.message.make_response(query)
59 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
60
61 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
62 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
63
64 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
65 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
66
67 name = 'no-edns.spoof.edns-self.tests.powerdns.com.'
68 query = dns.message.make_query(name, 'A', 'IN')
69 # dnsdist set RA = RD for spoofed responses
70 query.flags &= ~dns.flags.RD
71 expectedResponse = dns.message.make_response(query)
72 expectedResponse.answer.append(dns.rrset.from_text(name,
73 60,
74 dns.rdataclass.IN,
75 dns.rdatatype.A,
76 '192.0.2.1', '192.0.2.2'))
77
78 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
79 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
80
81 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
82 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
83
84 def testWithEDNSNoDO(self):
85 """
86 EDNS on Self-Generated: EDNS with DO=0
87 """
88 name = 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
89 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
90 expectedResponse = dns.message.make_response(query)
91 expectedResponse.set_rcode(dns.rcode.REFUSED)
92
93 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
94 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
95 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
96 self.assertEquals(receivedResponse.payload, 1042)
97
98 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
99 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
100 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
101 self.assertEquals(receivedResponse.payload, 1042)
102
103 name = 'edns-no-do.tc.edns-self.tests.powerdns.com.'
104 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
105 expectedResponse = dns.message.make_response(query)
106 expectedResponse.flags |= dns.flags.TC
107
108 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
109 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
110 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
111 self.assertEquals(receivedResponse.payload, 1042)
112
113 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
114 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
115 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
116 self.assertEquals(receivedResponse.payload, 1042)
117
118 name = 'edns-no-do.lua.edns-self.tests.powerdns.com.'
119 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
120 expectedResponse = dns.message.make_response(query)
121 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
122
123 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
124 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
125 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
126 self.assertEquals(receivedResponse.payload, 1042)
127
128 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
129 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
130 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
131 self.assertEquals(receivedResponse.payload, 1042)
132
133 name = 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
134 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
135 # dnsdist set RA = RD for spoofed responses
136 query.flags &= ~dns.flags.RD
137 expectedResponse = dns.message.make_response(query)
138 expectedResponse.answer.append(dns.rrset.from_text(name,
139 60,
140 dns.rdataclass.IN,
141 dns.rdatatype.A,
142 '192.0.2.1', '192.0.2.2'))
143
144 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
145 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
146 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
147 self.assertEquals(receivedResponse.payload, 1042)
148
149 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
150 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
151 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
152 self.assertEquals(receivedResponse.payload, 1042)
153
154 def testWithEDNSWithDO(self):
155 """
156 EDNS on Self-Generated: EDNS with DO=1
157 """
158 name = 'edns-do.rcode.edns-self.tests.powerdns.com.'
159 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
160 expectedResponse = dns.message.make_response(query)
161 expectedResponse.set_rcode(dns.rcode.REFUSED)
162
163 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
164 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
165 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
166 self.assertEquals(receivedResponse.payload, 1042)
167
168 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
169 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
170 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
171 self.assertEquals(receivedResponse.payload, 1042)
172
173 name = 'edns-do.tc.edns-self.tests.powerdns.com.'
174 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
175 expectedResponse = dns.message.make_response(query)
176 expectedResponse.flags |= dns.flags.TC
177
178 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
179 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
180 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
181 self.assertEquals(receivedResponse.payload, 1042)
182
183 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
184 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
185 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
186 self.assertEquals(receivedResponse.payload, 1042)
187
188 name = 'edns-do.lua.edns-self.tests.powerdns.com.'
189 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
190 expectedResponse = dns.message.make_response(query)
191 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
192
193 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
194 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
195 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
196 self.assertEquals(receivedResponse.payload, 1042)
197
198 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
199 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
200 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
201 self.assertEquals(receivedResponse.payload, 1042)
202
203 name = 'edns-do.spoof.edns-self.tests.powerdns.com.'
204 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
205 # dnsdist set RA = RD for spoofed responses
206 query.flags &= ~dns.flags.RD
207 expectedResponse = dns.message.make_response(query)
208 expectedResponse.answer.append(dns.rrset.from_text(name,
209 60,
210 dns.rdataclass.IN,
211 dns.rdatatype.A,
212 '192.0.2.1', '192.0.2.2'))
213
214 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
215 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
216 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
217 self.assertEquals(receivedResponse.payload, 1042)
218
219 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
220 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
221 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
222 self.assertEquals(receivedResponse.payload, 1042)
223
224 def testWithEDNSNoOptions(self):
225 """
226 EDNS on Self-Generated: EDNS with options in the query
227 """
228 name = 'edns-options.rcode.edns-self.tests.powerdns.com.'
229 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
230 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
231 expectedResponse = dns.message.make_response(query)
232 expectedResponse.set_rcode(dns.rcode.REFUSED)
233
234 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
235 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
236 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
237 self.assertEquals(receivedResponse.payload, 1042)
238
239 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
240 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
241 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
242 self.assertEquals(receivedResponse.payload, 1042)
243
244 name = 'edns-options.tc.edns-self.tests.powerdns.com.'
245 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
246 expectedResponse = dns.message.make_response(query)
247 expectedResponse.flags |= dns.flags.TC
248
249 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
250 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
251 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
252 self.assertEquals(receivedResponse.payload, 1042)
253
254 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
255 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
256 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
257 self.assertEquals(receivedResponse.payload, 1042)
258
259 name = 'edns-options.lua.edns-self.tests.powerdns.com.'
260 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
261 expectedResponse = dns.message.make_response(query)
262 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
263
264 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
265 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
266 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
267 self.assertEquals(receivedResponse.payload, 1042)
268
269 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
270 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
271 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
272 self.assertEquals(receivedResponse.payload, 1042)
273
274 name = 'edns-options.spoof.edns-self.tests.powerdns.com.'
275 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
276 # dnsdist set RA = RD for spoofed responses
277 query.flags &= ~dns.flags.RD
278 expectedResponse = dns.message.make_response(query)
279 expectedResponse.answer.append(dns.rrset.from_text(name,
280 60,
281 dns.rdataclass.IN,
282 dns.rdatatype.A,
283 '192.0.2.1', '192.0.2.2'))
284
285 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
286 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
287 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
288 self.assertEquals(receivedResponse.payload, 1042)
289
290 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
291 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
292 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
293 self.assertEquals(receivedResponse.payload, 1042)
294
295
296 class TestEDNSSelfGeneratedDisabled(DNSDistTest):
297 """
298 Check that dnsdist does not send EDNS data on
299 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
300 """
301
302 _config_template = """
303 setAddEDNSToSelfGeneratedResponses(false)
304
305 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(dnsdist.REFUSED))
306 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
307
308 function luarule(dq)
309 return DNSAction.Nxdomain, ""
310 end
311
312 addLuaAction("lua.edns-self-disabled.tests.powerdns.com.", luarule)
313
314 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
315
316 setPayloadSizeOnSelfGeneratedAnswers(1042)
317
318 newServer{address="127.0.0.1:%s"}
319 """
320
321 def testWithEDNSNoDO(self):
322 """
323 EDNS on Self-Generated (disabled): EDNS with DO=0
324 """
325 name = 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
326 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
327 expectedResponse = dns.message.make_response(query)
328 expectedResponse.set_rcode(dns.rcode.REFUSED)
329
330 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
331 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
332
333 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
334 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
335
336 name = 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
337 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
338 expectedResponse = dns.message.make_response(query)
339 expectedResponse.flags |= dns.flags.TC
340
341 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
342 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
343
344 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
345 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
346
347 name = 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
348 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
349 expectedResponse = dns.message.make_response(query)
350 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
351
352 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
353 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
354
355 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
356 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
357
358 name = 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
359 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
360 # dnsdist set RA = RD for spoofed responses
361 query.flags &= ~dns.flags.RD
362 expectedResponse = dns.message.make_response(query)
363 expectedResponse.answer.append(dns.rrset.from_text(name,
364 60,
365 dns.rdataclass.IN,
366 dns.rdatatype.A,
367 '192.0.2.1', '192.0.2.2'))
368
369 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
370 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
371
372 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
373 self.checkMessageNoEDNS(expectedResponse, receivedResponse)