]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
5 from datetime
import datetime
, timedelta
7 class TestEDNSSelfGenerated(DNSDistTest
):
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
13 _config_template
= """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(dnsdist.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
18 return DNSAction.Nxdomain, ""
21 addLuaAction("lua.edns-self.tests.powerdns.com.", luarule)
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
27 newServer{address="127.0.0.1:%s"}
32 EDNS on Self-Generated: No existing EDNS
34 name
= 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query
= dns
.message
.make_query(name
, 'A', 'IN')
36 expectedResponse
= dns
.message
.make_response(query
)
37 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
39 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
40 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
42 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
43 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
45 name
= 'no-edns.tc.edns-self.tests.powerdns.com.'
46 query
= dns
.message
.make_query(name
, 'A', 'IN')
47 expectedResponse
= dns
.message
.make_response(query
)
48 expectedResponse
.flags |
= dns
.flags
.TC
50 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
51 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
53 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
54 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
56 name
= 'no-edns.lua.edns-self.tests.powerdns.com.'
57 query
= dns
.message
.make_query(name
, 'A', 'IN')
58 expectedResponse
= dns
.message
.make_response(query
)
59 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
61 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
62 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
64 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
65 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
67 name
= 'no-edns.spoof.edns-self.tests.powerdns.com.'
68 query
= dns
.message
.make_query(name
, 'A', 'IN')
69 # dnsdist set RA = RD for spoofed responses
70 query
.flags
&= ~dns
.flags
.RD
71 expectedResponse
= dns
.message
.make_response(query
)
72 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
76 '192.0.2.1', '192.0.2.2'))
78 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
79 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
81 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
82 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
84 def testWithEDNSNoDO(self
):
86 EDNS on Self-Generated: EDNS with DO=0
88 name
= 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
89 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
90 expectedResponse
= dns
.message
.make_response(query
)
91 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
93 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
94 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
95 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
96 self
.assertEquals(receivedResponse
.payload
, 1042)
98 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
99 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
100 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
101 self
.assertEquals(receivedResponse
.payload
, 1042)
103 name
= 'edns-no-do.tc.edns-self.tests.powerdns.com.'
104 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
105 expectedResponse
= dns
.message
.make_response(query
)
106 expectedResponse
.flags |
= dns
.flags
.TC
108 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
109 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
110 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
111 self
.assertEquals(receivedResponse
.payload
, 1042)
113 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
114 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
115 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
116 self
.assertEquals(receivedResponse
.payload
, 1042)
118 name
= 'edns-no-do.lua.edns-self.tests.powerdns.com.'
119 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
120 expectedResponse
= dns
.message
.make_response(query
)
121 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
123 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
124 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
125 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
126 self
.assertEquals(receivedResponse
.payload
, 1042)
128 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
129 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
130 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
131 self
.assertEquals(receivedResponse
.payload
, 1042)
133 name
= 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
134 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
135 # dnsdist set RA = RD for spoofed responses
136 query
.flags
&= ~dns
.flags
.RD
137 expectedResponse
= dns
.message
.make_response(query
)
138 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
142 '192.0.2.1', '192.0.2.2'))
144 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
145 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
146 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
147 self
.assertEquals(receivedResponse
.payload
, 1042)
149 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
150 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
151 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
152 self
.assertEquals(receivedResponse
.payload
, 1042)
154 def testWithEDNSWithDO(self
):
156 EDNS on Self-Generated: EDNS with DO=1
158 name
= 'edns-do.rcode.edns-self.tests.powerdns.com.'
159 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
160 expectedResponse
= dns
.message
.make_response(query
)
161 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
163 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
164 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
165 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
166 self
.assertEquals(receivedResponse
.payload
, 1042)
168 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
169 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
170 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
171 self
.assertEquals(receivedResponse
.payload
, 1042)
173 name
= 'edns-do.tc.edns-self.tests.powerdns.com.'
174 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
175 expectedResponse
= dns
.message
.make_response(query
)
176 expectedResponse
.flags |
= dns
.flags
.TC
178 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
179 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
180 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
181 self
.assertEquals(receivedResponse
.payload
, 1042)
183 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
184 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
185 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
186 self
.assertEquals(receivedResponse
.payload
, 1042)
188 name
= 'edns-do.lua.edns-self.tests.powerdns.com.'
189 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
190 expectedResponse
= dns
.message
.make_response(query
)
191 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
193 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
194 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
195 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
196 self
.assertEquals(receivedResponse
.payload
, 1042)
198 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
199 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
200 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
201 self
.assertEquals(receivedResponse
.payload
, 1042)
203 name
= 'edns-do.spoof.edns-self.tests.powerdns.com.'
204 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
205 # dnsdist set RA = RD for spoofed responses
206 query
.flags
&= ~dns
.flags
.RD
207 expectedResponse
= dns
.message
.make_response(query
)
208 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
212 '192.0.2.1', '192.0.2.2'))
214 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
215 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
216 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
217 self
.assertEquals(receivedResponse
.payload
, 1042)
219 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
220 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
221 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
222 self
.assertEquals(receivedResponse
.payload
, 1042)
224 def testWithEDNSNoOptions(self
):
226 EDNS on Self-Generated: EDNS with options in the query
228 name
= 'edns-options.rcode.edns-self.tests.powerdns.com.'
229 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
230 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
231 expectedResponse
= dns
.message
.make_response(query
)
232 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
234 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
235 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
236 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
237 self
.assertEquals(receivedResponse
.payload
, 1042)
239 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
240 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
241 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
242 self
.assertEquals(receivedResponse
.payload
, 1042)
244 name
= 'edns-options.tc.edns-self.tests.powerdns.com.'
245 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
246 expectedResponse
= dns
.message
.make_response(query
)
247 expectedResponse
.flags |
= dns
.flags
.TC
249 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
250 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
251 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
252 self
.assertEquals(receivedResponse
.payload
, 1042)
254 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
255 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
256 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
257 self
.assertEquals(receivedResponse
.payload
, 1042)
259 name
= 'edns-options.lua.edns-self.tests.powerdns.com.'
260 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
261 expectedResponse
= dns
.message
.make_response(query
)
262 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
264 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
265 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
266 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
267 self
.assertEquals(receivedResponse
.payload
, 1042)
269 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
270 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
271 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
272 self
.assertEquals(receivedResponse
.payload
, 1042)
274 name
= 'edns-options.spoof.edns-self.tests.powerdns.com.'
275 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
276 # dnsdist set RA = RD for spoofed responses
277 query
.flags
&= ~dns
.flags
.RD
278 expectedResponse
= dns
.message
.make_response(query
)
279 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
283 '192.0.2.1', '192.0.2.2'))
285 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
286 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
287 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
288 self
.assertEquals(receivedResponse
.payload
, 1042)
290 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
291 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
292 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
293 self
.assertEquals(receivedResponse
.payload
, 1042)
296 class TestEDNSSelfGeneratedDisabled(DNSDistTest
):
298 Check that dnsdist does not send EDNS data on
299 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
302 _config_template
= """
303 setAddEDNSToSelfGeneratedResponses(false)
305 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(dnsdist.REFUSED))
306 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
309 return DNSAction.Nxdomain, ""
312 addLuaAction("lua.edns-self-disabled.tests.powerdns.com.", luarule)
314 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
316 setPayloadSizeOnSelfGeneratedAnswers(1042)
318 newServer{address="127.0.0.1:%s"}
321 def testWithEDNSNoDO(self
):
323 EDNS on Self-Generated (disabled): EDNS with DO=0
325 name
= 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
326 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
327 expectedResponse
= dns
.message
.make_response(query
)
328 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
330 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
331 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
333 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
334 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
336 name
= 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
337 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
338 expectedResponse
= dns
.message
.make_response(query
)
339 expectedResponse
.flags |
= dns
.flags
.TC
341 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
342 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
344 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
345 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
347 name
= 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
348 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
349 expectedResponse
= dns
.message
.make_response(query
)
350 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
352 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
353 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
355 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
356 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
358 name
= 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
359 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
360 # dnsdist set RA = RD for spoofed responses
361 query
.flags
&= ~dns
.flags
.RD
362 expectedResponse
= dns
.message
.make_response(query
)
363 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
367 '192.0.2.1', '192.0.2.2'))
369 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
370 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
372 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
373 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)