]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
5 from datetime
import datetime
, timedelta
7 class TestEDNSSelfGenerated(DNSDistTest
):
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
13 _config_template
= """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
18 return DNSAction.Nxdomain, ""
21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
27 newServer{address="127.0.0.1:%s"}
32 EDNS on Self-Generated: No existing EDNS
34 name
= 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query
= dns
.message
.make_query(name
, 'A', 'IN')
36 query
.flags
&= ~dns
.flags
.RD
37 expectedResponse
= dns
.message
.make_response(query
)
38 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
40 for method
in ("sendUDPQuery", "sendTCPQuery"):
41 sender
= getattr(self
, method
)
42 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
43 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
45 name
= 'no-edns.tc.edns-self.tests.powerdns.com.'
46 query
= dns
.message
.make_query(name
, 'A', 'IN')
47 # dnsdist sets RA = RD for TC responses
48 query
.flags
&= ~dns
.flags
.RD
49 expectedResponse
= dns
.message
.make_response(query
)
50 expectedResponse
.flags |
= dns
.flags
.TC
52 for method
in ("sendUDPQuery", "sendTCPQuery"):
53 sender
= getattr(self
, method
)
54 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
55 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
57 name
= 'no-edns.lua.edns-self.tests.powerdns.com.'
58 query
= dns
.message
.make_query(name
, 'A', 'IN')
59 expectedResponse
= dns
.message
.make_response(query
)
60 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
62 for method
in ("sendUDPQuery", "sendTCPQuery"):
63 sender
= getattr(self
, method
)
64 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
65 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
67 name
= 'no-edns.spoof.edns-self.tests.powerdns.com.'
68 query
= dns
.message
.make_query(name
, 'A', 'IN')
69 # dnsdist set RA = RD for spoofed responses
70 query
.flags
&= ~dns
.flags
.RD
71 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
72 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
76 '192.0.2.1', '192.0.2.2'))
78 for method
in ("sendUDPQuery", "sendTCPQuery"):
79 sender
= getattr(self
, method
)
80 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
81 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
83 def testWithEDNSNoDO(self
):
85 EDNS on Self-Generated: EDNS with DO=0
87 name
= 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
88 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
89 query
.flags
&= ~dns
.flags
.RD
90 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
91 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
93 for method
in ("sendUDPQuery", "sendTCPQuery"):
94 sender
= getattr(self
, method
)
95 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
96 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
97 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
98 self
.assertEquals(receivedResponse
.payload
, 1042)
100 name
= 'edns-no-do.tc.edns-self.tests.powerdns.com.'
101 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
102 # dnsdist sets RA = RD for TC responses
103 query
.flags
&= ~dns
.flags
.RD
104 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
105 expectedResponse
.flags |
= dns
.flags
.TC
107 for method
in ("sendUDPQuery", "sendTCPQuery"):
108 sender
= getattr(self
, method
)
109 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
110 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
111 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
112 self
.assertEquals(receivedResponse
.payload
, 1042)
114 name
= 'edns-no-do.lua.edns-self.tests.powerdns.com.'
115 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
116 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
117 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
119 for method
in ("sendUDPQuery", "sendTCPQuery"):
120 sender
= getattr(self
, method
)
121 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
122 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
123 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
124 self
.assertEquals(receivedResponse
.payload
, 1042)
126 name
= 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
127 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
128 # dnsdist set RA = RD for spoofed responses
129 query
.flags
&= ~dns
.flags
.RD
130 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
131 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
135 '192.0.2.1', '192.0.2.2'))
137 for method
in ("sendUDPQuery", "sendTCPQuery"):
138 sender
= getattr(self
, method
)
139 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
140 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
141 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
142 self
.assertEquals(receivedResponse
.payload
, 1042)
144 def testWithEDNSWithDO(self
):
146 EDNS on Self-Generated: EDNS with DO=1
148 name
= 'edns-do.rcode.edns-self.tests.powerdns.com.'
149 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
150 query
.flags
&= ~dns
.flags
.RD
151 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
152 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
154 for method
in ("sendUDPQuery", "sendTCPQuery"):
155 sender
= getattr(self
, method
)
156 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
157 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
158 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
159 self
.assertEquals(receivedResponse
.payload
, 1042)
161 name
= 'edns-do.tc.edns-self.tests.powerdns.com.'
162 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
163 # dnsdist sets RA = RD for TC responses
164 query
.flags
&= ~dns
.flags
.RD
165 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
166 expectedResponse
.flags |
= dns
.flags
.TC
168 for method
in ("sendUDPQuery", "sendTCPQuery"):
169 sender
= getattr(self
, method
)
170 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
171 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
172 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
173 self
.assertEquals(receivedResponse
.payload
, 1042)
175 name
= 'edns-do.lua.edns-self.tests.powerdns.com.'
176 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
177 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
178 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
180 for method
in ("sendUDPQuery", "sendTCPQuery"):
181 sender
= getattr(self
, method
)
182 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
183 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
184 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
185 self
.assertEquals(receivedResponse
.payload
, 1042)
187 name
= 'edns-do.spoof.edns-self.tests.powerdns.com.'
188 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
189 # dnsdist set RA = RD for spoofed responses
190 query
.flags
&= ~dns
.flags
.RD
191 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
192 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
196 '192.0.2.1', '192.0.2.2'))
198 for method
in ("sendUDPQuery", "sendTCPQuery"):
199 sender
= getattr(self
, method
)
200 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
201 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
202 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
203 self
.assertEquals(receivedResponse
.payload
, 1042)
205 def testWithEDNSNoOptions(self
):
207 EDNS on Self-Generated: EDNS with options in the query
209 name
= 'edns-options.rcode.edns-self.tests.powerdns.com.'
210 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
211 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
212 query
.flags
&= ~dns
.flags
.RD
213 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
214 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
216 for method
in ("sendUDPQuery", "sendTCPQuery"):
217 sender
= getattr(self
, method
)
218 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
219 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
220 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
221 self
.assertEquals(receivedResponse
.payload
, 1042)
223 name
= 'edns-options.tc.edns-self.tests.powerdns.com.'
224 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
225 # dnsdist sets RA = RD for TC responses
226 query
.flags
&= ~dns
.flags
.RD
227 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
228 expectedResponse
.flags |
= dns
.flags
.TC
230 for method
in ("sendUDPQuery", "sendTCPQuery"):
231 sender
= getattr(self
, method
)
232 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
233 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
234 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
235 self
.assertEquals(receivedResponse
.payload
, 1042)
237 name
= 'edns-options.lua.edns-self.tests.powerdns.com.'
238 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
239 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
240 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
242 for method
in ("sendUDPQuery", "sendTCPQuery"):
243 sender
= getattr(self
, method
)
244 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
245 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
246 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
247 self
.assertEquals(receivedResponse
.payload
, 1042)
249 name
= 'edns-options.spoof.edns-self.tests.powerdns.com.'
250 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
251 # dnsdist set RA = RD for spoofed responses
252 query
.flags
&= ~dns
.flags
.RD
253 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
254 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
258 '192.0.2.1', '192.0.2.2'))
260 for method
in ("sendUDPQuery", "sendTCPQuery"):
261 sender
= getattr(self
, method
)
262 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
263 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
264 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
265 self
.assertEquals(receivedResponse
.payload
, 1042)
268 class TestEDNSSelfGeneratedDisabled(DNSDistTest
):
270 Check that dnsdist does not send EDNS data on
271 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
274 _config_template
= """
275 setAddEDNSToSelfGeneratedResponses(false)
277 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
278 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
281 return DNSAction.Nxdomain, ""
284 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
286 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
288 setPayloadSizeOnSelfGeneratedAnswers(1042)
290 newServer{address="127.0.0.1:%s"}
293 def testWithEDNSNoDO(self
):
295 EDNS on Self-Generated (disabled): EDNS with DO=0
297 name
= 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
298 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
299 query
.flags
&= ~dns
.flags
.RD
300 expectedResponse
= dns
.message
.make_response(query
)
301 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
303 for method
in ("sendUDPQuery", "sendTCPQuery"):
304 sender
= getattr(self
, method
)
305 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
306 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
308 name
= 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
309 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
310 # dnsdist sets RA = RD for TC responses
311 query
.flags
&= ~dns
.flags
.RD
312 expectedResponse
= dns
.message
.make_response(query
)
313 expectedResponse
.flags |
= dns
.flags
.TC
315 for method
in ("sendUDPQuery", "sendTCPQuery"):
316 sender
= getattr(self
, method
)
317 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
318 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
320 name
= 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
321 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
322 expectedResponse
= dns
.message
.make_response(query
)
323 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
325 for method
in ("sendUDPQuery", "sendTCPQuery"):
326 sender
= getattr(self
, method
)
327 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
328 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
330 name
= 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
331 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
332 # dnsdist set RA = RD for spoofed responses
333 query
.flags
&= ~dns
.flags
.RD
334 expectedResponse
= dns
.message
.make_response(query
)
335 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
339 '192.0.2.1', '192.0.2.2'))
341 for method
in ("sendUDPQuery", "sendTCPQuery"):
342 sender
= getattr(self
, method
)
343 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
344 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)