]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
6 class TestBasics(DNSDistTest
):
9 newServer{address="127.0.0.1:%s"}
11 addAction(QTypeRule(DNSQType.ANY), TCAction())
12 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
13 mySMN = newSuffixMatchNode()
14 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
15 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
16 addAction(QNameSuffixRule({"drop.test.powerdns.com.", "drop2.test.powerdns.com."}), DropAction())
17 addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
18 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
19 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
22 def testDropped(self
):
26 Send an A query for drop.test.powerdns.com. domain,
27 which is dropped by configuration. We expect
30 for name
in ['drop.test.powerdns.com.', 'drop2.test.powerdns.com.']:
31 query
= dns
.message
.make_query(name
, 'A', 'IN')
32 for method
in ("sendUDPQuery", "sendTCPQuery"):
33 sender
= getattr(self
, method
)
34 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
35 self
.assertEqual(receivedResponse
, None)
37 def testAWithECS(self
):
39 Basics: A query with an ECS value
41 name
= 'awithecs.tests.powerdns.com.'
42 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
43 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
])
44 response
= dns
.message
.make_response(query
)
45 rrset
= dns
.rrset
.from_text(name
,
51 response
.answer
.append(rrset
)
53 for method
in ("sendUDPQuery", "sendTCPQuery"):
54 sender
= getattr(self
, method
)
55 (receivedQuery
, receivedResponse
) = sender(query
, response
)
56 receivedQuery
.id = query
.id
57 self
.assertEqual(query
, receivedQuery
)
58 self
.assertEqual(response
, receivedResponse
)
60 def testSimpleA(self
):
62 Basics: A query without EDNS
64 name
= 'simplea.tests.powerdns.com.'
65 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
66 response
= dns
.message
.make_response(query
)
67 rrset
= dns
.rrset
.from_text(name
,
72 response
.answer
.append(rrset
)
74 for method
in ("sendUDPQuery", "sendTCPQuery"):
75 sender
= getattr(self
, method
)
76 (receivedQuery
, receivedResponse
) = sender(query
, response
)
77 self
.assertTrue(receivedQuery
)
78 self
.assertTrue(receivedResponse
)
79 receivedQuery
.id = query
.id
80 self
.assertEqual(query
, receivedQuery
)
81 self
.assertEqual(response
, receivedResponse
)
83 def testAnyIsTruncated(self
):
85 Basics: Truncate ANY query
87 dnsdist is configured to reply with TC to ANY queries,
88 send an ANY query and check the result.
89 It should be truncated over UDP, not over TCP.
91 name
= 'any.tests.powerdns.com.'
92 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
93 # dnsdist sets RA = RD for TC responses
94 query
.flags
&= ~dns
.flags
.RD
95 expectedResponse
= dns
.message
.make_response(query
)
96 expectedResponse
.flags |
= dns
.flags
.TC
98 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
99 self
.assertEqual(receivedResponse
, expectedResponse
)
101 response
= dns
.message
.make_response(query
)
102 rrset
= dns
.rrset
.from_text(name
,
108 response
.answer
.append(rrset
)
110 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
111 self
.assertTrue(receivedQuery
)
112 self
.assertTrue(receivedResponse
)
113 receivedQuery
.id = query
.id
114 self
.assertEqual(query
, receivedQuery
)
115 self
.assertEqual(receivedResponse
, response
)
117 def testTruncateTC(self
):
121 dnsdist is configured to truncate TC (default),
122 we make the backend send responses
123 with TC set and additional content,
124 and check that the received response has been fixed.
126 name
= 'atruncatetc.tests.powerdns.com.'
127 query
= dns
.message
.make_query(name
, 'A', 'IN')
128 response
= dns
.message
.make_response(query
)
129 rrset
= dns
.rrset
.from_text(name
,
135 response
.answer
.append(rrset
)
136 response
.flags |
= dns
.flags
.TC
137 expectedResponse
= dns
.message
.make_response(query
)
138 expectedResponse
.flags |
= dns
.flags
.TC
140 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
141 receivedQuery
.id = query
.id
142 self
.assertEqual(query
, receivedQuery
)
143 self
.assertEqual(expectedResponse
.flags
, receivedResponse
.flags
)
144 self
.assertEqual(expectedResponse
.question
, receivedResponse
.question
)
145 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
146 self
.assertEqual(len(receivedResponse
.answer
), 0)
147 self
.assertEqual(len(receivedResponse
.authority
), 0)
148 self
.assertEqual(len(receivedResponse
.additional
), 0)
149 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
151 def testTruncateTCEDNS(self
):
153 Basics: Truncate TC with EDNS
155 dnsdist is configured to truncate TC (default),
156 we make the backend send responses
157 with TC set and additional content,
158 and check that the received response has been fixed.
159 Note that the query and initial response had EDNS,
160 so the final response should have it too.
162 name
= 'atruncatetc.tests.powerdns.com.'
163 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
164 # force a different responder payload than the one in the query,
165 # so we check that we don't just mirror it
166 response
= dns
.message
.make_response(query
, our_payload
=4242)
167 rrset
= dns
.rrset
.from_text(name
,
173 response
.answer
.append(rrset
)
174 response
.flags |
= dns
.flags
.TC
175 expectedResponse
= dns
.message
.make_response(query
, our_payload
=4242)
176 expectedResponse
.flags |
= dns
.flags
.TC
178 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
179 receivedQuery
.id = query
.id
180 self
.assertEqual(query
, receivedQuery
)
181 self
.assertEqual(response
.flags
, receivedResponse
.flags
)
182 self
.assertEqual(response
.question
, receivedResponse
.question
)
183 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
184 self
.assertEqual(len(receivedResponse
.answer
), 0)
185 self
.assertEqual(len(receivedResponse
.authority
), 0)
186 self
.assertEqual(len(receivedResponse
.additional
), 0)
187 print(expectedResponse
)
188 print(receivedResponse
)
189 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
190 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
191 self
.assertEqual(receivedResponse
.payload
, 4242)
193 def testRegexReturnsRefused(self
):
195 Basics: Refuse query matching regex
197 dnsdist is configured to reply 'refused' for query
198 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
199 We send a query for evil4242.powerdns.com
200 and check that the response is "refused".
202 name
= 'evil4242.regex.tests.powerdns.com.'
203 query
= dns
.message
.make_query(name
, 'A', 'IN')
204 query
.flags
&= ~dns
.flags
.RD
205 expectedResponse
= dns
.message
.make_response(query
)
206 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
208 for method
in ("sendUDPQuery", "sendTCPQuery"):
209 sender
= getattr(self
, method
)
210 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
211 self
.assertEqual(receivedResponse
, expectedResponse
)
213 def testQNameReturnsSpoofed(self
):
215 Basics: test QNameRule and Spoof
217 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
220 query
= dns
.message
.make_query(name
, 'A', 'IN')
221 query
.flags
&= ~dns
.flags
.RD
222 expectedResponse
= dns
.message
.make_response(query
)
223 expectedResponse
.set_rcode(dns
.rcode
.NOERROR
)
224 rrset
= dns
.rrset
.from_text(name
,
229 expectedResponse
.answer
.append(rrset
)
231 for method
in ("sendUDPQuery", "sendTCPQuery"):
232 sender
= getattr(self
, method
)
233 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
234 self
.assertEqual(receivedResponse
, expectedResponse
)
236 def testDomainAndQTypeReturnsNotImplemented(self
):
238 Basics: NOTIMPL for specific name and qtype
240 dnsdist is configured to reply 'not implemented' for query
241 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
242 We send a TXT query for "nameAndQtype.powerdns.com."
243 and check that the response is 'not implemented'.
245 name
= 'nameAndQtype.tests.powerdns.com.'
246 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
247 query
.flags
&= ~dns
.flags
.RD
248 expectedResponse
= dns
.message
.make_response(query
)
249 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
251 for method
in ("sendUDPQuery", "sendTCPQuery"):
252 sender
= getattr(self
, method
)
253 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
254 self
.assertEqual(receivedResponse
, expectedResponse
)
256 def testDomainWithoutQTypeIsNotAffected(self
):
258 Basics: NOTIMPL qtype canary
260 dnsdist is configured to reply 'not implemented' for query
261 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
262 We send a A query for "nameAndQtype.tests.powerdns.com."
263 and check that the response is OK.
265 name
= 'nameAndQtype.tests.powerdns.com.'
266 query
= dns
.message
.make_query(name
, 'A', 'IN')
267 response
= dns
.message
.make_response(query
)
268 rrset
= dns
.rrset
.from_text(name
,
273 response
.answer
.append(rrset
)
275 for method
in ("sendUDPQuery", "sendTCPQuery"):
276 sender
= getattr(self
, method
)
277 (receivedQuery
, receivedResponse
) = sender(query
, response
)
278 self
.assertTrue(receivedQuery
)
279 self
.assertTrue(receivedResponse
)
280 receivedQuery
.id = query
.id
281 self
.assertEqual(query
, receivedQuery
)
282 self
.assertEqual(response
, receivedResponse
)
284 def testOtherDomainANDQTypeIsNotAffected(self
):
286 Basics: NOTIMPL qname canary
288 dnsdist is configured to reply 'not implemented' for query
289 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
290 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
291 and check that the response is OK.
293 name
= 'OtherNameAndQtype.tests.powerdns.com.'
294 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
295 response
= dns
.message
.make_response(query
)
296 rrset
= dns
.rrset
.from_text(name
,
300 'nothing to see here')
301 response
.answer
.append(rrset
)
303 for method
in ("sendUDPQuery", "sendTCPQuery"):
304 sender
= getattr(self
, method
)
305 (receivedQuery
, receivedResponse
) = sender(query
, response
)
306 self
.assertTrue(receivedQuery
)
307 self
.assertTrue(receivedResponse
)
308 receivedQuery
.id = query
.id
309 self
.assertEqual(query
, receivedQuery
)
310 self
.assertEqual(response
, receivedResponse
)
312 def testWrongResponse(self
):
314 Basics: Unrelated response from the backend
316 The backend send an unrelated answer over UDP, it should
317 be discarded by dnsdist. It could happen if we wrap around
318 maxOutstanding queries too quickly or have more than maxOutstanding
319 queries to a specific backend in the air over UDP,
320 but does not really make sense over TCP.
322 name
= 'query.unrelated.tests.powerdns.com.'
323 unrelatedName
= 'answer.unrelated.tests.powerdns.com.'
324 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
325 unrelatedQuery
= dns
.message
.make_query(unrelatedName
, 'TXT', 'IN')
326 unrelatedResponse
= dns
.message
.make_response(unrelatedQuery
)
327 rrset
= dns
.rrset
.from_text(unrelatedName
,
331 'nothing to see here')
332 unrelatedResponse
.answer
.append(rrset
)
334 for method
in ("sendUDPQuery", "sendTCPQuery"):
335 sender
= getattr(self
, method
)
336 (receivedQuery
, receivedResponse
) = sender(query
, unrelatedResponse
)
337 self
.assertTrue(receivedQuery
)
338 self
.assertEqual(receivedResponse
, None)
339 receivedQuery
.id = query
.id
340 self
.assertEqual(query
, receivedQuery
)
342 def testHeaderOnlyRefused(self
):
344 Basics: Header-only refused response
346 name
= 'header-only-refused-response.tests.powerdns.com.'
347 query
= dns
.message
.make_query(name
, 'A', 'IN')
348 response
= dns
.message
.make_response(query
)
349 response
.set_rcode(dns
.rcode
.REFUSED
)
350 response
.question
= []
352 for method
in ("sendUDPQuery", "sendTCPQuery"):
353 sender
= getattr(self
, method
)
354 (receivedQuery
, receivedResponse
) = sender(query
, response
)
355 self
.assertTrue(receivedQuery
)
356 receivedQuery
.id = query
.id
357 self
.assertEqual(query
, receivedQuery
)
358 self
.assertEqual(receivedResponse
, response
)
360 def testHeaderOnlyNoErrorResponse(self
):
362 Basics: Header-only NoError response should be dropped
364 name
= 'header-only-noerror-response.tests.powerdns.com.'
365 query
= dns
.message
.make_query(name
, 'A', 'IN')
366 response
= dns
.message
.make_response(query
)
367 response
.question
= []
369 for method
in ("sendUDPQuery", "sendTCPQuery"):
370 sender
= getattr(self
, method
)
371 (receivedQuery
, receivedResponse
) = sender(query
, response
)
372 self
.assertTrue(receivedQuery
)
373 receivedQuery
.id = query
.id
374 self
.assertEqual(query
, receivedQuery
)
375 self
.assertEqual(receivedResponse
, None)
377 def testHeaderOnlyNXDResponse(self
):
379 Basics: Header-only NXD response should be dropped
381 name
= 'header-only-nxd-response.tests.powerdns.com.'
382 query
= dns
.message
.make_query(name
, 'A', 'IN')
383 response
= dns
.message
.make_response(query
)
384 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
385 response
.question
= []
387 for method
in ("sendUDPQuery", "sendTCPQuery"):
388 sender
= getattr(self
, method
)
389 (receivedQuery
, receivedResponse
) = sender(query
, response
)
390 self
.assertTrue(receivedQuery
)
391 receivedQuery
.id = query
.id
392 self
.assertEqual(query
, receivedQuery
)
393 self
.assertEqual(receivedResponse
, None)
395 def testAddActionDNSName(self
):
397 Basics: test if addAction accepts a DNSName
399 name
= 'dnsname.addaction.powerdns.com.'
400 query
= dns
.message
.make_query(name
, 'A', 'IN')
401 query
.flags
&= ~dns
.flags
.RD
402 expectedResponse
= dns
.message
.make_response(query
)
403 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
405 for method
in ("sendUDPQuery", "sendTCPQuery"):
406 sender
= getattr(self
, method
)
407 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
408 self
.assertEqual(receivedResponse
, expectedResponse
)
410 def testAddActionDNSNames(self
):
412 Basics: test if addAction accepts a table of DNSNames
414 for name
in ['dnsname-table{}.addaction.powerdns.com.'.format(i
) for i
in range(1,2)]:
415 query
= dns
.message
.make_query(name
, 'A', 'IN')
416 query
.flags
&= ~dns
.flags
.RD
417 expectedResponse
= dns
.message
.make_response(query
)
418 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
420 for method
in ("sendUDPQuery", "sendTCPQuery"):
421 sender
= getattr(self
, method
)
422 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
423 self
.assertEqual(receivedResponse
, expectedResponse
)
425 def testEmptyQueries(self
):
427 Basic: NotImp on empty queries
429 name
= 'notimp-empty-queries.basic.tests.powerdns.com.'
430 query
= dns
.message
.Message()
432 response
= dns
.message
.make_response(query
)
433 response
.set_rcode(dns
.rcode
.NOTIMP
)
435 for method
in ("sendUDPQuery", "sendTCPQuery"):
436 sender
= getattr(self
, method
)
437 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
438 self
.assertEqual(receivedResponse
, response
)