]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
6186841b52335e0290dcd20210a7434ea61e661c
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
5 from datetime
import datetime
, timedelta
7 class TestEDNSSelfGenerated(DNSDistTest
):
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
13 _config_template
= """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
18 return DNSAction.Nxdomain, ""
21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
27 newServer{address="127.0.0.1:%s"}
32 EDNS on Self-Generated: No existing EDNS
34 name
= 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query
= dns
.message
.make_query(name
, 'A', 'IN')
36 query
.flags
&= ~dns
.flags
.RD
37 expectedResponse
= dns
.message
.make_response(query
)
38 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
40 for method
in ("sendUDPQuery", "sendTCPQuery"):
41 sender
= getattr(self
, method
)
42 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
43 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
45 name
= 'no-edns.tc.edns-self.tests.powerdns.com.'
46 query
= dns
.message
.make_query(name
, 'A', 'IN')
47 # dnsdist sets RA = RD for TC responses
48 query
.flags
&= ~dns
.flags
.RD
49 expectedResponse
= dns
.message
.make_response(query
)
50 expectedResponse
.flags |
= dns
.flags
.TC
52 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
53 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
55 name
= 'no-edns.lua.edns-self.tests.powerdns.com.'
56 query
= dns
.message
.make_query(name
, 'A', 'IN')
57 expectedResponse
= dns
.message
.make_response(query
)
58 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
60 for method
in ("sendUDPQuery", "sendTCPQuery"):
61 sender
= getattr(self
, method
)
62 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
63 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
65 name
= 'no-edns.spoof.edns-self.tests.powerdns.com.'
66 query
= dns
.message
.make_query(name
, 'A', 'IN')
67 # dnsdist set RA = RD for spoofed responses
68 query
.flags
&= ~dns
.flags
.RD
69 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
70 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
74 '192.0.2.1', '192.0.2.2'))
76 for method
in ("sendUDPQuery", "sendTCPQuery"):
77 sender
= getattr(self
, method
)
78 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
79 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
81 def testWithEDNSNoDO(self
):
83 EDNS on Self-Generated: EDNS with DO=0
85 name
= 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
86 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
87 query
.flags
&= ~dns
.flags
.RD
88 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
89 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
91 for method
in ("sendUDPQuery", "sendTCPQuery"):
92 sender
= getattr(self
, method
)
93 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
94 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
95 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
96 self
.assertEqual(receivedResponse
.payload
, 1042)
98 name
= 'edns-no-do.tc.edns-self.tests.powerdns.com.'
99 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
100 # dnsdist sets RA = RD for TC responses
101 query
.flags
&= ~dns
.flags
.RD
102 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
103 expectedResponse
.flags |
= dns
.flags
.TC
105 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
106 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
107 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
108 self
.assertEqual(receivedResponse
.payload
, 1042)
110 name
= 'edns-no-do.lua.edns-self.tests.powerdns.com.'
111 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
112 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
113 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
115 for method
in ("sendUDPQuery", "sendTCPQuery"):
116 sender
= getattr(self
, method
)
117 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
118 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
119 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
120 self
.assertEqual(receivedResponse
.payload
, 1042)
122 name
= 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
123 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
124 # dnsdist set RA = RD for spoofed responses
125 query
.flags
&= ~dns
.flags
.RD
126 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
127 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
131 '192.0.2.1', '192.0.2.2'))
133 for method
in ("sendUDPQuery", "sendTCPQuery"):
134 sender
= getattr(self
, method
)
135 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
136 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
137 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
138 self
.assertEqual(receivedResponse
.payload
, 1042)
140 def testWithEDNSWithDO(self
):
142 EDNS on Self-Generated: EDNS with DO=1
144 name
= 'edns-do.rcode.edns-self.tests.powerdns.com.'
145 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
146 query
.flags
&= ~dns
.flags
.RD
147 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
148 expectedResponse
.want_dnssec(True)
149 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
151 for method
in ("sendUDPQuery", "sendTCPQuery"):
152 sender
= getattr(self
, method
)
153 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
154 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
155 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
156 self
.assertEqual(receivedResponse
.payload
, 1042)
158 name
= 'edns-do.tc.edns-self.tests.powerdns.com.'
159 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
160 # dnsdist sets RA = RD for TC responses
161 query
.flags
&= ~dns
.flags
.RD
162 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
163 expectedResponse
.want_dnssec(True)
164 expectedResponse
.flags |
= dns
.flags
.TC
166 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
167 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
168 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
169 self
.assertEqual(receivedResponse
.payload
, 1042)
171 name
= 'edns-do.lua.edns-self.tests.powerdns.com.'
172 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
173 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
174 expectedResponse
.want_dnssec(True)
175 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
177 for method
in ("sendUDPQuery", "sendTCPQuery"):
178 sender
= getattr(self
, method
)
179 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
180 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
181 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
182 self
.assertEqual(receivedResponse
.payload
, 1042)
184 name
= 'edns-do.spoof.edns-self.tests.powerdns.com.'
185 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
186 # dnsdist set RA = RD for spoofed responses
187 query
.flags
&= ~dns
.flags
.RD
188 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
189 expectedResponse
.want_dnssec(True)
190 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
194 '192.0.2.1', '192.0.2.2'))
196 for method
in ("sendUDPQuery", "sendTCPQuery"):
197 sender
= getattr(self
, method
)
198 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
199 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
200 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
201 self
.assertEqual(receivedResponse
.payload
, 1042)
203 def testWithEDNSNoOptions(self
):
205 EDNS on Self-Generated: EDNS with options in the query
207 name
= 'edns-options.rcode.edns-self.tests.powerdns.com.'
208 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
209 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
210 query
.flags
&= ~dns
.flags
.RD
211 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
212 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
213 expectedResponse
.want_dnssec(True)
215 for method
in ("sendUDPQuery", "sendTCPQuery"):
216 sender
= getattr(self
, method
)
217 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
218 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
219 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
220 self
.assertEqual(receivedResponse
.payload
, 1042)
222 name
= 'edns-options.tc.edns-self.tests.powerdns.com.'
223 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
224 # dnsdist sets RA = RD for TC responses
225 query
.flags
&= ~dns
.flags
.RD
226 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
227 expectedResponse
.want_dnssec(True)
228 expectedResponse
.flags |
= dns
.flags
.TC
230 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
231 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
232 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
233 self
.assertEqual(receivedResponse
.payload
, 1042)
235 name
= 'edns-options.lua.edns-self.tests.powerdns.com.'
236 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
237 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
238 expectedResponse
.want_dnssec(True)
239 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
241 for method
in ("sendUDPQuery", "sendTCPQuery"):
242 sender
= getattr(self
, method
)
243 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
244 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
245 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
246 self
.assertEqual(receivedResponse
.payload
, 1042)
248 name
= 'edns-options.spoof.edns-self.tests.powerdns.com.'
249 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
250 # dnsdist set RA = RD for spoofed responses
251 query
.flags
&= ~dns
.flags
.RD
252 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
253 expectedResponse
.want_dnssec(True)
254 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
258 '192.0.2.1', '192.0.2.2'))
260 for method
in ("sendUDPQuery", "sendTCPQuery"):
261 sender
= getattr(self
, method
)
262 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
263 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
264 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
265 self
.assertEqual(receivedResponse
.payload
, 1042)
268 class TestEDNSSelfGeneratedDisabled(DNSDistTest
):
270 Check that dnsdist does not send EDNS data on
271 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
274 _config_template
= """
275 setAddEDNSToSelfGeneratedResponses(false)
277 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
278 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
281 return DNSAction.Nxdomain, ""
284 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
286 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
288 setPayloadSizeOnSelfGeneratedAnswers(1042)
290 newServer{address="127.0.0.1:%s"}
293 def testWithEDNSNoDO(self
):
295 EDNS on Self-Generated (disabled): EDNS with DO=0
297 name
= 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
298 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
299 query
.flags
&= ~dns
.flags
.RD
300 expectedResponse
= dns
.message
.make_response(query
)
301 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
303 for method
in ("sendUDPQuery", "sendTCPQuery"):
304 sender
= getattr(self
, method
)
305 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
306 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
308 name
= 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
309 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
310 # dnsdist sets RA = RD for TC responses
311 query
.flags
&= ~dns
.flags
.RD
312 expectedResponse
= dns
.message
.make_response(query
)
313 expectedResponse
.flags |
= dns
.flags
.TC
315 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
316 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
318 name
= 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
319 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
320 expectedResponse
= dns
.message
.make_response(query
)
321 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
323 for method
in ("sendUDPQuery", "sendTCPQuery"):
324 sender
= getattr(self
, method
)
325 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
326 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
328 name
= 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
329 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
330 # dnsdist set RA = RD for spoofed responses
331 query
.flags
&= ~dns
.flags
.RD
332 expectedResponse
= dns
.message
.make_response(query
)
333 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
337 '192.0.2.1', '192.0.2.2'))
339 for method
in ("sendUDPQuery", "sendTCPQuery"):
340 sender
= getattr(self
, method
)
341 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
342 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)