]> git.ipfire.org Git - thirdparty/pdns.git/blame_incremental - regression-tests.dnsdist/test_EDNSSelfGenerated.py
Merge pull request #14580 from rgacogne/fix-coverity
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSSelfGenerated.py
... / ...
CommitLineData
1#!/usr/bin/env python
2import dns
3import clientsubnetoption
4from dnsdisttests import DNSDistTest
5from datetime import datetime, timedelta
6
7class TestEDNSSelfGenerated(DNSDistTest):
8 """
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
11 """
12
13 _config_template = """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
16
17 function luarule(dq)
18 return DNSAction.Nxdomain, ""
19 end
20
21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
22
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
24
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
26
27 newServer{address="127.0.0.1:%s"}
28 """
29
30 def testNoEDNS(self):
31 """
32 EDNS on Self-Generated: No existing EDNS
33 """
34 name = 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query = dns.message.make_query(name, 'A', 'IN')
36 query.flags &= ~dns.flags.RD
37 expectedResponse = dns.message.make_response(query)
38 expectedResponse.set_rcode(dns.rcode.REFUSED)
39
40 for method in ("sendUDPQuery", "sendTCPQuery"):
41 sender = getattr(self, method)
42 (_, receivedResponse) = sender(query, response=None, useQueue=False)
43 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
44
45 name = 'no-edns.tc.edns-self.tests.powerdns.com.'
46 query = dns.message.make_query(name, 'A', 'IN')
47 # dnsdist sets RA = RD for TC responses
48 query.flags &= ~dns.flags.RD
49 expectedResponse = dns.message.make_response(query)
50 expectedResponse.flags |= dns.flags.TC
51
52 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
53 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
54
55 name = 'no-edns.lua.edns-self.tests.powerdns.com.'
56 query = dns.message.make_query(name, 'A', 'IN')
57 expectedResponse = dns.message.make_response(query)
58 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
59
60 for method in ("sendUDPQuery", "sendTCPQuery"):
61 sender = getattr(self, method)
62 (_, receivedResponse) = sender(query, response=None, useQueue=False)
63 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
64
65 name = 'no-edns.spoof.edns-self.tests.powerdns.com.'
66 query = dns.message.make_query(name, 'A', 'IN')
67 # dnsdist set RA = RD for spoofed responses
68 query.flags &= ~dns.flags.RD
69 expectedResponse = dns.message.make_response(query, our_payload=1042)
70 expectedResponse.answer.append(dns.rrset.from_text(name,
71 60,
72 dns.rdataclass.IN,
73 dns.rdatatype.A,
74 '192.0.2.1', '192.0.2.2'))
75
76 for method in ("sendUDPQuery", "sendTCPQuery"):
77 sender = getattr(self, method)
78 (_, receivedResponse) = sender(query, response=None, useQueue=False)
79 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
80
81 def testWithEDNSNoDO(self):
82 """
83 EDNS on Self-Generated: EDNS with DO=0
84 """
85 name = 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
86 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
87 query.flags &= ~dns.flags.RD
88 expectedResponse = dns.message.make_response(query, our_payload=1042)
89 expectedResponse.set_rcode(dns.rcode.REFUSED)
90
91 for method in ("sendUDPQuery", "sendTCPQuery"):
92 sender = getattr(self, method)
93 (_, receivedResponse) = sender(query, response=None, useQueue=False)
94 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
95 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
96 self.assertEqual(receivedResponse.payload, 1042)
97
98 name = 'edns-no-do.tc.edns-self.tests.powerdns.com.'
99 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
100 # dnsdist sets RA = RD for TC responses
101 query.flags &= ~dns.flags.RD
102 expectedResponse = dns.message.make_response(query, our_payload=1042)
103 expectedResponse.flags |= dns.flags.TC
104
105 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
106 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
107 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
108 self.assertEqual(receivedResponse.payload, 1042)
109
110 name = 'edns-no-do.lua.edns-self.tests.powerdns.com.'
111 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
112 expectedResponse = dns.message.make_response(query, our_payload=1042)
113 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
114
115 for method in ("sendUDPQuery", "sendTCPQuery"):
116 sender = getattr(self, method)
117 (_, receivedResponse) = sender(query, response=None, useQueue=False)
118 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
119 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
120 self.assertEqual(receivedResponse.payload, 1042)
121
122 name = 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
123 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
124 # dnsdist set RA = RD for spoofed responses
125 query.flags &= ~dns.flags.RD
126 expectedResponse = dns.message.make_response(query, our_payload=1042)
127 expectedResponse.answer.append(dns.rrset.from_text(name,
128 60,
129 dns.rdataclass.IN,
130 dns.rdatatype.A,
131 '192.0.2.1', '192.0.2.2'))
132
133 for method in ("sendUDPQuery", "sendTCPQuery"):
134 sender = getattr(self, method)
135 (_, receivedResponse) = sender(query, response=None, useQueue=False)
136 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
137 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
138 self.assertEqual(receivedResponse.payload, 1042)
139
140 def testWithEDNSWithDO(self):
141 """
142 EDNS on Self-Generated: EDNS with DO=1
143 """
144 name = 'edns-do.rcode.edns-self.tests.powerdns.com.'
145 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
146 query.flags &= ~dns.flags.RD
147 expectedResponse = dns.message.make_response(query, our_payload=1042)
148 expectedResponse.want_dnssec(True)
149 expectedResponse.set_rcode(dns.rcode.REFUSED)
150
151 for method in ("sendUDPQuery", "sendTCPQuery"):
152 sender = getattr(self, method)
153 (_, receivedResponse) = sender(query, response=None, useQueue=False)
154 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
155 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
156 self.assertEqual(receivedResponse.payload, 1042)
157
158 name = 'edns-do.tc.edns-self.tests.powerdns.com.'
159 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
160 # dnsdist sets RA = RD for TC responses
161 query.flags &= ~dns.flags.RD
162 expectedResponse = dns.message.make_response(query, our_payload=1042)
163 expectedResponse.want_dnssec(True)
164 expectedResponse.flags |= dns.flags.TC
165
166 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
167 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
168 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
169 self.assertEqual(receivedResponse.payload, 1042)
170
171 name = 'edns-do.lua.edns-self.tests.powerdns.com.'
172 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
173 expectedResponse = dns.message.make_response(query, our_payload=1042)
174 expectedResponse.want_dnssec(True)
175 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
176
177 for method in ("sendUDPQuery", "sendTCPQuery"):
178 sender = getattr(self, method)
179 (_, receivedResponse) = sender(query, response=None, useQueue=False)
180 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
181 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
182 self.assertEqual(receivedResponse.payload, 1042)
183
184 name = 'edns-do.spoof.edns-self.tests.powerdns.com.'
185 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
186 # dnsdist set RA = RD for spoofed responses
187 query.flags &= ~dns.flags.RD
188 expectedResponse = dns.message.make_response(query, our_payload=1042)
189 expectedResponse.want_dnssec(True)
190 expectedResponse.answer.append(dns.rrset.from_text(name,
191 60,
192 dns.rdataclass.IN,
193 dns.rdatatype.A,
194 '192.0.2.1', '192.0.2.2'))
195
196 for method in ("sendUDPQuery", "sendTCPQuery"):
197 sender = getattr(self, method)
198 (_, receivedResponse) = sender(query, response=None, useQueue=False)
199 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
200 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
201 self.assertEqual(receivedResponse.payload, 1042)
202
203 def testWithEDNSNoOptions(self):
204 """
205 EDNS on Self-Generated: EDNS with options in the query
206 """
207 name = 'edns-options.rcode.edns-self.tests.powerdns.com.'
208 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
209 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
210 query.flags &= ~dns.flags.RD
211 expectedResponse = dns.message.make_response(query, our_payload=1042)
212 expectedResponse.set_rcode(dns.rcode.REFUSED)
213 expectedResponse.want_dnssec(True)
214
215 for method in ("sendUDPQuery", "sendTCPQuery"):
216 sender = getattr(self, method)
217 (_, receivedResponse) = sender(query, response=None, useQueue=False)
218 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
219 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
220 self.assertEqual(receivedResponse.payload, 1042)
221
222 name = 'edns-options.tc.edns-self.tests.powerdns.com.'
223 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
224 # dnsdist sets RA = RD for TC responses
225 query.flags &= ~dns.flags.RD
226 expectedResponse = dns.message.make_response(query, our_payload=1042)
227 expectedResponse.want_dnssec(True)
228 expectedResponse.flags |= dns.flags.TC
229
230 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
231 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
232 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
233 self.assertEqual(receivedResponse.payload, 1042)
234
235 name = 'edns-options.lua.edns-self.tests.powerdns.com.'
236 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
237 expectedResponse = dns.message.make_response(query, our_payload=1042)
238 expectedResponse.want_dnssec(True)
239 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
240
241 for method in ("sendUDPQuery", "sendTCPQuery"):
242 sender = getattr(self, method)
243 (_, receivedResponse) = sender(query, response=None, useQueue=False)
244 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
245 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
246 self.assertEqual(receivedResponse.payload, 1042)
247
248 name = 'edns-options.spoof.edns-self.tests.powerdns.com.'
249 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512, want_dnssec=True)
250 # dnsdist set RA = RD for spoofed responses
251 query.flags &= ~dns.flags.RD
252 expectedResponse = dns.message.make_response(query, our_payload=1042)
253 expectedResponse.want_dnssec(True)
254 expectedResponse.answer.append(dns.rrset.from_text(name,
255 60,
256 dns.rdataclass.IN,
257 dns.rdatatype.A,
258 '192.0.2.1', '192.0.2.2'))
259
260 for method in ("sendUDPQuery", "sendTCPQuery"):
261 sender = getattr(self, method)
262 (_, receivedResponse) = sender(query, response=None, useQueue=False)
263 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
264 self.assertTrue(receivedResponse.ednsflags & dns.flags.DO)
265 self.assertEqual(receivedResponse.payload, 1042)
266
267
268class TestEDNSSelfGeneratedDisabled(DNSDistTest):
269 """
270 Check that dnsdist does not send EDNS data on
271 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
272 """
273
274 _config_template = """
275 setAddEDNSToSelfGeneratedResponses(false)
276
277 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
278 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
279
280 function luarule(dq)
281 return DNSAction.Nxdomain, ""
282 end
283
284 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
285
286 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
287
288 setPayloadSizeOnSelfGeneratedAnswers(1042)
289
290 newServer{address="127.0.0.1:%s"}
291 """
292
293 def testWithEDNSNoDO(self):
294 """
295 EDNS on Self-Generated (disabled): EDNS with DO=0
296 """
297 name = 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
298 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
299 query.flags &= ~dns.flags.RD
300 expectedResponse = dns.message.make_response(query)
301 expectedResponse.set_rcode(dns.rcode.REFUSED)
302
303 for method in ("sendUDPQuery", "sendTCPQuery"):
304 sender = getattr(self, method)
305 (_, receivedResponse) = sender(query, response=None, useQueue=False)
306 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
307
308 name = 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
309 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
310 # dnsdist sets RA = RD for TC responses
311 query.flags &= ~dns.flags.RD
312 expectedResponse = dns.message.make_response(query)
313 expectedResponse.flags |= dns.flags.TC
314
315 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
316 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
317
318 name = 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
319 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
320 expectedResponse = dns.message.make_response(query)
321 expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
322
323 for method in ("sendUDPQuery", "sendTCPQuery"):
324 sender = getattr(self, method)
325 (_, receivedResponse) = sender(query, response=None, useQueue=False)
326 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
327
328 name = 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
329 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=False)
330 # dnsdist set RA = RD for spoofed responses
331 query.flags &= ~dns.flags.RD
332 expectedResponse = dns.message.make_response(query)
333 expectedResponse.answer.append(dns.rrset.from_text(name,
334 60,
335 dns.rdataclass.IN,
336 dns.rdatatype.A,
337 '192.0.2.1', '192.0.2.2'))
338
339 for method in ("sendUDPQuery", "sendTCPQuery"):
340 sender = getattr(self, method)
341 (_, receivedResponse) = sender(query, response=None, useQueue=False)
342 self.checkMessageNoEDNS(expectedResponse, receivedResponse)