]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
5 from datetime
import datetime
, timedelta
7 class TestEDNSSelfGenerated(DNSDistTest
):
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
13 _config_template
= """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(dnsdist.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
18 return DNSAction.Nxdomain, ""
21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
27 newServer{address="127.0.0.1:%s"}
32 EDNS on Self-Generated: No existing EDNS
34 name
= 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query
= dns
.message
.make_query(name
, 'A', 'IN')
36 expectedResponse
= dns
.message
.make_response(query
)
37 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
39 for method
in ("sendUDPQuery", "sendTCPQuery"):
40 sender
= getattr(self
, method
)
41 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
42 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
44 name
= 'no-edns.tc.edns-self.tests.powerdns.com.'
45 query
= dns
.message
.make_query(name
, 'A', 'IN')
46 expectedResponse
= dns
.message
.make_response(query
)
47 expectedResponse
.flags |
= dns
.flags
.TC
49 for method
in ("sendUDPQuery", "sendTCPQuery"):
50 sender
= getattr(self
, method
)
51 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
52 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
54 name
= 'no-edns.lua.edns-self.tests.powerdns.com.'
55 query
= dns
.message
.make_query(name
, 'A', 'IN')
56 expectedResponse
= dns
.message
.make_response(query
)
57 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
59 for method
in ("sendUDPQuery", "sendTCPQuery"):
60 sender
= getattr(self
, method
)
61 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
62 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
64 name
= 'no-edns.spoof.edns-self.tests.powerdns.com.'
65 query
= dns
.message
.make_query(name
, 'A', 'IN')
66 # dnsdist set RA = RD for spoofed responses
67 query
.flags
&= ~dns
.flags
.RD
68 expectedResponse
= dns
.message
.make_response(query
)
69 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
73 '192.0.2.1', '192.0.2.2'))
75 for method
in ("sendUDPQuery", "sendTCPQuery"):
76 sender
= getattr(self
, method
)
77 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
78 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
80 def testWithEDNSNoDO(self
):
82 EDNS on Self-Generated: EDNS with DO=0
84 name
= 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
85 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
86 expectedResponse
= dns
.message
.make_response(query
)
87 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
89 for method
in ("sendUDPQuery", "sendTCPQuery"):
90 sender
= getattr(self
, method
)
91 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
92 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
93 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
94 self
.assertEquals(receivedResponse
.payload
, 1042)
96 name
= 'edns-no-do.tc.edns-self.tests.powerdns.com.'
97 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
98 expectedResponse
= dns
.message
.make_response(query
)
99 expectedResponse
.flags |
= dns
.flags
.TC
101 for method
in ("sendUDPQuery", "sendTCPQuery"):
102 sender
= getattr(self
, method
)
103 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
104 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
105 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
106 self
.assertEquals(receivedResponse
.payload
, 1042)
108 name
= 'edns-no-do.lua.edns-self.tests.powerdns.com.'
109 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
110 expectedResponse
= dns
.message
.make_response(query
)
111 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
113 for method
in ("sendUDPQuery", "sendTCPQuery"):
114 sender
= getattr(self
, method
)
115 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
116 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
117 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
118 self
.assertEquals(receivedResponse
.payload
, 1042)
120 name
= 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
121 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
122 # dnsdist set RA = RD for spoofed responses
123 query
.flags
&= ~dns
.flags
.RD
124 expectedResponse
= dns
.message
.make_response(query
)
125 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
129 '192.0.2.1', '192.0.2.2'))
131 for method
in ("sendUDPQuery", "sendTCPQuery"):
132 sender
= getattr(self
, method
)
133 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
134 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
135 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
136 self
.assertEquals(receivedResponse
.payload
, 1042)
138 def testWithEDNSWithDO(self
):
140 EDNS on Self-Generated: EDNS with DO=1
142 name
= 'edns-do.rcode.edns-self.tests.powerdns.com.'
143 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
144 expectedResponse
= dns
.message
.make_response(query
)
145 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
147 for method
in ("sendUDPQuery", "sendTCPQuery"):
148 sender
= getattr(self
, method
)
149 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
150 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
151 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
152 self
.assertEquals(receivedResponse
.payload
, 1042)
154 name
= 'edns-do.tc.edns-self.tests.powerdns.com.'
155 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
156 expectedResponse
= dns
.message
.make_response(query
)
157 expectedResponse
.flags |
= dns
.flags
.TC
159 for method
in ("sendUDPQuery", "sendTCPQuery"):
160 sender
= getattr(self
, method
)
161 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
162 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
163 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
164 self
.assertEquals(receivedResponse
.payload
, 1042)
166 name
= 'edns-do.lua.edns-self.tests.powerdns.com.'
167 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
168 expectedResponse
= dns
.message
.make_response(query
)
169 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
171 for method
in ("sendUDPQuery", "sendTCPQuery"):
172 sender
= getattr(self
, method
)
173 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
174 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
175 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
176 self
.assertEquals(receivedResponse
.payload
, 1042)
178 name
= 'edns-do.spoof.edns-self.tests.powerdns.com.'
179 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
180 # dnsdist set RA = RD for spoofed responses
181 query
.flags
&= ~dns
.flags
.RD
182 expectedResponse
= dns
.message
.make_response(query
)
183 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
187 '192.0.2.1', '192.0.2.2'))
189 for method
in ("sendUDPQuery", "sendTCPQuery"):
190 sender
= getattr(self
, method
)
191 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
192 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
193 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
194 self
.assertEquals(receivedResponse
.payload
, 1042)
196 def testWithEDNSNoOptions(self
):
198 EDNS on Self-Generated: EDNS with options in the query
200 name
= 'edns-options.rcode.edns-self.tests.powerdns.com.'
201 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
202 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
203 expectedResponse
= dns
.message
.make_response(query
)
204 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
206 for method
in ("sendUDPQuery", "sendTCPQuery"):
207 sender
= getattr(self
, method
)
208 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
209 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
210 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
211 self
.assertEquals(receivedResponse
.payload
, 1042)
213 name
= 'edns-options.tc.edns-self.tests.powerdns.com.'
214 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
215 expectedResponse
= dns
.message
.make_response(query
)
216 expectedResponse
.flags |
= dns
.flags
.TC
218 for method
in ("sendUDPQuery", "sendTCPQuery"):
219 sender
= getattr(self
, method
)
220 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
221 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
222 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
223 self
.assertEquals(receivedResponse
.payload
, 1042)
225 name
= 'edns-options.lua.edns-self.tests.powerdns.com.'
226 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
227 expectedResponse
= dns
.message
.make_response(query
)
228 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
230 for method
in ("sendUDPQuery", "sendTCPQuery"):
231 sender
= getattr(self
, method
)
232 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
233 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
234 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
235 self
.assertEquals(receivedResponse
.payload
, 1042)
237 name
= 'edns-options.spoof.edns-self.tests.powerdns.com.'
238 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
239 # dnsdist set RA = RD for spoofed responses
240 query
.flags
&= ~dns
.flags
.RD
241 expectedResponse
= dns
.message
.make_response(query
)
242 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
246 '192.0.2.1', '192.0.2.2'))
248 for method
in ("sendUDPQuery", "sendTCPQuery"):
249 sender
= getattr(self
, method
)
250 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
251 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
252 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
253 self
.assertEquals(receivedResponse
.payload
, 1042)
256 class TestEDNSSelfGeneratedDisabled(DNSDistTest
):
258 Check that dnsdist does not send EDNS data on
259 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
262 _config_template
= """
263 setAddEDNSToSelfGeneratedResponses(false)
265 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(dnsdist.REFUSED))
266 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
269 return DNSAction.Nxdomain, ""
272 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
274 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
276 setPayloadSizeOnSelfGeneratedAnswers(1042)
278 newServer{address="127.0.0.1:%s"}
281 def testWithEDNSNoDO(self
):
283 EDNS on Self-Generated (disabled): EDNS with DO=0
285 name
= 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
286 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
287 expectedResponse
= dns
.message
.make_response(query
)
288 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
290 for method
in ("sendUDPQuery", "sendTCPQuery"):
291 sender
= getattr(self
, method
)
292 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
293 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
295 name
= 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
296 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
297 expectedResponse
= dns
.message
.make_response(query
)
298 expectedResponse
.flags |
= dns
.flags
.TC
300 for method
in ("sendUDPQuery", "sendTCPQuery"):
301 sender
= getattr(self
, method
)
302 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
303 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
305 name
= 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
306 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
307 expectedResponse
= dns
.message
.make_response(query
)
308 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
310 for method
in ("sendUDPQuery", "sendTCPQuery"):
311 sender
= getattr(self
, method
)
312 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
313 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
315 name
= 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
316 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
317 # dnsdist set RA = RD for spoofed responses
318 query
.flags
&= ~dns
.flags
.RD
319 expectedResponse
= dns
.message
.make_response(query
)
320 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
324 '192.0.2.1', '192.0.2.2'))
326 for method
in ("sendUDPQuery", "sendTCPQuery"):
327 sender
= getattr(self
, method
)
328 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
329 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)