]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
5 from datetime
import datetime
, timedelta
7 class TestEDNSSelfGenerated(DNSDistTest
):
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
13 _config_template
= """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
18 return DNSAction.Nxdomain, ""
21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
27 newServer{address="127.0.0.1:%s"}
32 EDNS on Self-Generated: No existing EDNS
34 name
= 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query
= dns
.message
.make_query(name
, 'A', 'IN')
36 expectedResponse
= dns
.message
.make_response(query
)
37 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
39 for method
in ("sendUDPQuery", "sendTCPQuery"):
40 sender
= getattr(self
, method
)
41 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
42 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
44 name
= 'no-edns.tc.edns-self.tests.powerdns.com.'
45 query
= dns
.message
.make_query(name
, 'A', 'IN')
46 # dnsdist sets RA = RD for TC responses
47 query
.flags
&= ~dns
.flags
.RD
48 expectedResponse
= dns
.message
.make_response(query
)
49 expectedResponse
.flags |
= dns
.flags
.TC
51 for method
in ("sendUDPQuery", "sendTCPQuery"):
52 sender
= getattr(self
, method
)
53 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
54 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
56 name
= 'no-edns.lua.edns-self.tests.powerdns.com.'
57 query
= dns
.message
.make_query(name
, 'A', 'IN')
58 expectedResponse
= dns
.message
.make_response(query
)
59 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
61 for method
in ("sendUDPQuery", "sendTCPQuery"):
62 sender
= getattr(self
, method
)
63 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
64 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
66 name
= 'no-edns.spoof.edns-self.tests.powerdns.com.'
67 query
= dns
.message
.make_query(name
, 'A', 'IN')
68 # dnsdist set RA = RD for spoofed responses
69 query
.flags
&= ~dns
.flags
.RD
70 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
71 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
75 '192.0.2.1', '192.0.2.2'))
77 for method
in ("sendUDPQuery", "sendTCPQuery"):
78 sender
= getattr(self
, method
)
79 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
80 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
82 def testWithEDNSNoDO(self
):
84 EDNS on Self-Generated: EDNS with DO=0
86 name
= 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
87 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
88 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
89 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
91 for method
in ("sendUDPQuery", "sendTCPQuery"):
92 sender
= getattr(self
, method
)
93 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
94 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
95 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
96 self
.assertEquals(receivedResponse
.payload
, 1042)
98 name
= 'edns-no-do.tc.edns-self.tests.powerdns.com.'
99 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
100 # dnsdist sets RA = RD for TC responses
101 query
.flags
&= ~dns
.flags
.RD
102 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
103 expectedResponse
.flags |
= dns
.flags
.TC
105 for method
in ("sendUDPQuery", "sendTCPQuery"):
106 sender
= getattr(self
, method
)
107 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
108 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
109 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
110 self
.assertEquals(receivedResponse
.payload
, 1042)
112 name
= 'edns-no-do.lua.edns-self.tests.powerdns.com.'
113 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
114 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
115 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
117 for method
in ("sendUDPQuery", "sendTCPQuery"):
118 sender
= getattr(self
, method
)
119 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
120 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
121 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
122 self
.assertEquals(receivedResponse
.payload
, 1042)
124 name
= 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
125 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
126 # dnsdist set RA = RD for spoofed responses
127 query
.flags
&= ~dns
.flags
.RD
128 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
129 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
133 '192.0.2.1', '192.0.2.2'))
135 for method
in ("sendUDPQuery", "sendTCPQuery"):
136 sender
= getattr(self
, method
)
137 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
138 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
139 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
140 self
.assertEquals(receivedResponse
.payload
, 1042)
142 def testWithEDNSWithDO(self
):
144 EDNS on Self-Generated: EDNS with DO=1
146 name
= 'edns-do.rcode.edns-self.tests.powerdns.com.'
147 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
148 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
149 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
151 for method
in ("sendUDPQuery", "sendTCPQuery"):
152 sender
= getattr(self
, method
)
153 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
154 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
155 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
156 self
.assertEquals(receivedResponse
.payload
, 1042)
158 name
= 'edns-do.tc.edns-self.tests.powerdns.com.'
159 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
160 # dnsdist sets RA = RD for TC responses
161 query
.flags
&= ~dns
.flags
.RD
162 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
163 expectedResponse
.flags |
= dns
.flags
.TC
165 for method
in ("sendUDPQuery", "sendTCPQuery"):
166 sender
= getattr(self
, method
)
167 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
168 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
169 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
170 self
.assertEquals(receivedResponse
.payload
, 1042)
172 name
= 'edns-do.lua.edns-self.tests.powerdns.com.'
173 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
174 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
175 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
177 for method
in ("sendUDPQuery", "sendTCPQuery"):
178 sender
= getattr(self
, method
)
179 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
180 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
181 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
182 self
.assertEquals(receivedResponse
.payload
, 1042)
184 name
= 'edns-do.spoof.edns-self.tests.powerdns.com.'
185 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
186 # dnsdist set RA = RD for spoofed responses
187 query
.flags
&= ~dns
.flags
.RD
188 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
189 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
193 '192.0.2.1', '192.0.2.2'))
195 for method
in ("sendUDPQuery", "sendTCPQuery"):
196 sender
= getattr(self
, method
)
197 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
198 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
199 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
200 self
.assertEquals(receivedResponse
.payload
, 1042)
202 def testWithEDNSNoOptions(self
):
204 EDNS on Self-Generated: EDNS with options in the query
206 name
= 'edns-options.rcode.edns-self.tests.powerdns.com.'
207 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
208 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
209 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
210 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
212 for method
in ("sendUDPQuery", "sendTCPQuery"):
213 sender
= getattr(self
, method
)
214 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
215 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
216 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
217 self
.assertEquals(receivedResponse
.payload
, 1042)
219 name
= 'edns-options.tc.edns-self.tests.powerdns.com.'
220 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
221 # dnsdist sets RA = RD for TC responses
222 query
.flags
&= ~dns
.flags
.RD
223 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
224 expectedResponse
.flags |
= dns
.flags
.TC
226 for method
in ("sendUDPQuery", "sendTCPQuery"):
227 sender
= getattr(self
, method
)
228 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
229 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
230 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
231 self
.assertEquals(receivedResponse
.payload
, 1042)
233 name
= 'edns-options.lua.edns-self.tests.powerdns.com.'
234 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
235 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
236 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
238 for method
in ("sendUDPQuery", "sendTCPQuery"):
239 sender
= getattr(self
, method
)
240 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
241 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
242 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
243 self
.assertEquals(receivedResponse
.payload
, 1042)
245 name
= 'edns-options.spoof.edns-self.tests.powerdns.com.'
246 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
247 # dnsdist set RA = RD for spoofed responses
248 query
.flags
&= ~dns
.flags
.RD
249 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
250 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
254 '192.0.2.1', '192.0.2.2'))
256 for method
in ("sendUDPQuery", "sendTCPQuery"):
257 sender
= getattr(self
, method
)
258 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
259 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
260 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
261 self
.assertEquals(receivedResponse
.payload
, 1042)
264 class TestEDNSSelfGeneratedDisabled(DNSDistTest
):
266 Check that dnsdist does not send EDNS data on
267 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
270 _config_template
= """
271 setAddEDNSToSelfGeneratedResponses(false)
273 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
274 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
277 return DNSAction.Nxdomain, ""
280 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
282 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
284 setPayloadSizeOnSelfGeneratedAnswers(1042)
286 newServer{address="127.0.0.1:%s"}
289 def testWithEDNSNoDO(self
):
291 EDNS on Self-Generated (disabled): EDNS with DO=0
293 name
= 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
294 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
295 expectedResponse
= dns
.message
.make_response(query
)
296 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
298 for method
in ("sendUDPQuery", "sendTCPQuery"):
299 sender
= getattr(self
, method
)
300 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
301 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
303 name
= 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
304 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
305 # dnsdist sets RA = RD for TC responses
306 query
.flags
&= ~dns
.flags
.RD
307 expectedResponse
= dns
.message
.make_response(query
)
308 expectedResponse
.flags |
= dns
.flags
.TC
310 for method
in ("sendUDPQuery", "sendTCPQuery"):
311 sender
= getattr(self
, method
)
312 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
313 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
315 name
= 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
316 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
317 expectedResponse
= dns
.message
.make_response(query
)
318 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
320 for method
in ("sendUDPQuery", "sendTCPQuery"):
321 sender
= getattr(self
, method
)
322 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
323 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
325 name
= 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
326 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
327 # dnsdist set RA = RD for spoofed responses
328 query
.flags
&= ~dns
.flags
.RD
329 expectedResponse
= dns
.message
.make_response(query
)
330 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
334 '192.0.2.1', '192.0.2.2'))
336 for method
in ("sendUDPQuery", "sendTCPQuery"):
337 sender
= getattr(self
, method
)
338 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
339 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)