]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSSelfGenerated.py
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
5 from datetime
import datetime
, timedelta
7 class TestEDNSSelfGenerated(DNSDistTest
):
9 Check that dnsdist sends correct EDNS data on
10 self-generated (RCodeAction(), TCAction(), Lua..)
13 _config_template
= """
14 addAction("rcode.edns-self.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
15 addAction("tc.edns-self.tests.powerdns.com.", TCAction())
18 return DNSAction.Nxdomain, ""
21 addAction("lua.edns-self.tests.powerdns.com.", LuaAction(luarule))
23 addAction("spoof.edns-self.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
25 setPayloadSizeOnSelfGeneratedAnswers(1042)
27 newServer{address="127.0.0.1:%s"}
32 EDNS on Self-Generated: No existing EDNS
34 name
= 'no-edns.rcode.edns-self.tests.powerdns.com.'
35 query
= dns
.message
.make_query(name
, 'A', 'IN')
36 query
.flags
&= ~dns
.flags
.RD
37 expectedResponse
= dns
.message
.make_response(query
)
38 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
40 for method
in ("sendUDPQuery", "sendTCPQuery"):
41 sender
= getattr(self
, method
)
42 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
43 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
45 name
= 'no-edns.tc.edns-self.tests.powerdns.com.'
46 query
= dns
.message
.make_query(name
, 'A', 'IN')
47 # dnsdist sets RA = RD for TC responses
48 query
.flags
&= ~dns
.flags
.RD
49 expectedResponse
= dns
.message
.make_response(query
)
50 expectedResponse
.flags |
= dns
.flags
.TC
52 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
53 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
55 name
= 'no-edns.lua.edns-self.tests.powerdns.com.'
56 query
= dns
.message
.make_query(name
, 'A', 'IN')
57 expectedResponse
= dns
.message
.make_response(query
)
58 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
60 for method
in ("sendUDPQuery", "sendTCPQuery"):
61 sender
= getattr(self
, method
)
62 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
63 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
65 name
= 'no-edns.spoof.edns-self.tests.powerdns.com.'
66 query
= dns
.message
.make_query(name
, 'A', 'IN')
67 # dnsdist set RA = RD for spoofed responses
68 query
.flags
&= ~dns
.flags
.RD
69 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
70 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
74 '192.0.2.1', '192.0.2.2'))
76 for method
in ("sendUDPQuery", "sendTCPQuery"):
77 sender
= getattr(self
, method
)
78 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
79 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
81 def testWithEDNSNoDO(self
):
83 EDNS on Self-Generated: EDNS with DO=0
85 name
= 'edns-no-do.rcode.edns-self.tests.powerdns.com.'
86 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
87 query
.flags
&= ~dns
.flags
.RD
88 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
89 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
91 for method
in ("sendUDPQuery", "sendTCPQuery"):
92 sender
= getattr(self
, method
)
93 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
94 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
95 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
96 self
.assertEqual(receivedResponse
.payload
, 1042)
98 name
= 'edns-no-do.tc.edns-self.tests.powerdns.com.'
99 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
100 # dnsdist sets RA = RD for TC responses
101 query
.flags
&= ~dns
.flags
.RD
102 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
103 expectedResponse
.flags |
= dns
.flags
.TC
105 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
106 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
107 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
108 self
.assertEqual(receivedResponse
.payload
, 1042)
110 name
= 'edns-no-do.lua.edns-self.tests.powerdns.com.'
111 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
112 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
113 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
115 for method
in ("sendUDPQuery", "sendTCPQuery"):
116 sender
= getattr(self
, method
)
117 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
118 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
119 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
120 self
.assertEqual(receivedResponse
.payload
, 1042)
122 name
= 'edns-no-do.spoof.edns-self.tests.powerdns.com.'
123 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
124 # dnsdist set RA = RD for spoofed responses
125 query
.flags
&= ~dns
.flags
.RD
126 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
127 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
131 '192.0.2.1', '192.0.2.2'))
133 for method
in ("sendUDPQuery", "sendTCPQuery"):
134 sender
= getattr(self
, method
)
135 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
136 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
137 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
138 self
.assertEqual(receivedResponse
.payload
, 1042)
140 def testWithEDNSWithDO(self
):
142 EDNS on Self-Generated: EDNS with DO=1
144 name
= 'edns-do.rcode.edns-self.tests.powerdns.com.'
145 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
146 query
.flags
&= ~dns
.flags
.RD
147 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
148 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
150 for method
in ("sendUDPQuery", "sendTCPQuery"):
151 sender
= getattr(self
, method
)
152 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
153 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
154 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
155 self
.assertEqual(receivedResponse
.payload
, 1042)
157 name
= 'edns-do.tc.edns-self.tests.powerdns.com.'
158 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
159 # dnsdist sets RA = RD for TC responses
160 query
.flags
&= ~dns
.flags
.RD
161 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
162 expectedResponse
.flags |
= dns
.flags
.TC
164 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
165 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
166 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
167 self
.assertEqual(receivedResponse
.payload
, 1042)
169 name
= 'edns-do.lua.edns-self.tests.powerdns.com.'
170 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
171 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
172 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
174 for method
in ("sendUDPQuery", "sendTCPQuery"):
175 sender
= getattr(self
, method
)
176 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
177 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
178 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
179 self
.assertEqual(receivedResponse
.payload
, 1042)
181 name
= 'edns-do.spoof.edns-self.tests.powerdns.com.'
182 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
183 # dnsdist set RA = RD for spoofed responses
184 query
.flags
&= ~dns
.flags
.RD
185 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
186 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
190 '192.0.2.1', '192.0.2.2'))
192 for method
in ("sendUDPQuery", "sendTCPQuery"):
193 sender
= getattr(self
, method
)
194 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
195 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
196 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
197 self
.assertEqual(receivedResponse
.payload
, 1042)
199 def testWithEDNSNoOptions(self
):
201 EDNS on Self-Generated: EDNS with options in the query
203 name
= 'edns-options.rcode.edns-self.tests.powerdns.com.'
204 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
205 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
206 query
.flags
&= ~dns
.flags
.RD
207 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
208 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
210 for method
in ("sendUDPQuery", "sendTCPQuery"):
211 sender
= getattr(self
, method
)
212 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
213 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
214 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
215 self
.assertEqual(receivedResponse
.payload
, 1042)
217 name
= 'edns-options.tc.edns-self.tests.powerdns.com.'
218 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
219 # dnsdist sets RA = RD for TC responses
220 query
.flags
&= ~dns
.flags
.RD
221 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
222 expectedResponse
.flags |
= dns
.flags
.TC
224 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
225 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
226 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
227 self
.assertEqual(receivedResponse
.payload
, 1042)
229 name
= 'edns-options.lua.edns-self.tests.powerdns.com.'
230 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
231 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
232 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
234 for method
in ("sendUDPQuery", "sendTCPQuery"):
235 sender
= getattr(self
, method
)
236 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
237 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
238 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
239 self
.assertEqual(receivedResponse
.payload
, 1042)
241 name
= 'edns-options.spoof.edns-self.tests.powerdns.com.'
242 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512, want_dnssec
=True)
243 # dnsdist set RA = RD for spoofed responses
244 query
.flags
&= ~dns
.flags
.RD
245 expectedResponse
= dns
.message
.make_response(query
, our_payload
=1042)
246 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
250 '192.0.2.1', '192.0.2.2'))
252 for method
in ("sendUDPQuery", "sendTCPQuery"):
253 sender
= getattr(self
, method
)
254 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
255 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
256 self
.assertTrue(receivedResponse
.ednsflags
& dns
.flags
.DO
)
257 self
.assertEqual(receivedResponse
.payload
, 1042)
260 class TestEDNSSelfGeneratedDisabled(DNSDistTest
):
262 Check that dnsdist does not send EDNS data on
263 self-generated (RCodeAction(), TCAction(), Lua..) when disabled
266 _config_template
= """
267 setAddEDNSToSelfGeneratedResponses(false)
269 addAction("rcode.edns-self-disabled.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
270 addAction("tc.edns-self-disabled.tests.powerdns.com.", TCAction())
273 return DNSAction.Nxdomain, ""
276 addAction("lua.edns-self-disabled.tests.powerdns.com.", LuaAction(luarule))
278 addAction("spoof.edns-self-disabled.tests.powerdns.com.", SpoofAction({'192.0.2.1', '192.0.2.2'}))
280 setPayloadSizeOnSelfGeneratedAnswers(1042)
282 newServer{address="127.0.0.1:%s"}
285 def testWithEDNSNoDO(self
):
287 EDNS on Self-Generated (disabled): EDNS with DO=0
289 name
= 'edns-no-do.rcode.edns-self-disabled.tests.powerdns.com.'
290 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
291 query
.flags
&= ~dns
.flags
.RD
292 expectedResponse
= dns
.message
.make_response(query
)
293 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
295 for method
in ("sendUDPQuery", "sendTCPQuery"):
296 sender
= getattr(self
, method
)
297 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
298 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
300 name
= 'edns-no-do.tc.edns-self-disabled.tests.powerdns.com.'
301 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
302 # dnsdist sets RA = RD for TC responses
303 query
.flags
&= ~dns
.flags
.RD
304 expectedResponse
= dns
.message
.make_response(query
)
305 expectedResponse
.flags |
= dns
.flags
.TC
307 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
308 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
310 name
= 'edns-no-do.lua.edns-self-disabled.tests.powerdns.com.'
311 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
312 expectedResponse
= dns
.message
.make_response(query
)
313 expectedResponse
.set_rcode(dns
.rcode
.NXDOMAIN
)
315 for method
in ("sendUDPQuery", "sendTCPQuery"):
316 sender
= getattr(self
, method
)
317 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
318 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
320 name
= 'edns-no-do.spoof.edns-self-disabled.tests.powerdns.com.'
321 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=False)
322 # dnsdist set RA = RD for spoofed responses
323 query
.flags
&= ~dns
.flags
.RD
324 expectedResponse
= dns
.message
.make_response(query
)
325 expectedResponse
.answer
.append(dns
.rrset
.from_text(name
,
329 '192.0.2.1', '192.0.2.2'))
331 for method
in ("sendUDPQuery", "sendTCPQuery"):
332 sender
= getattr(self
, method
)
333 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
334 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)