]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestBasics(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
12 addAction(AndRule{QTypeRule(dnsdist.ANY), TCPRule(false)}, TCAction())
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
23 def testDropped(self
):
27 Send an A query for drop.test.powerdns.com. domain,
28 which is dropped by configuration. We expect
31 name
= 'drop.test.powerdns.com.'
32 query
= dns
.message
.make_query(name
, 'A', 'IN')
33 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
34 self
.assertEquals(receivedResponse
, None)
36 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
37 self
.assertEquals(receivedResponse
, None)
39 def testAWithECS(self
):
41 Basics: A query with an ECS value
43 name
= 'awithecs.tests.powerdns.com.'
44 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
45 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
])
46 response
= dns
.message
.make_response(query
)
47 rrset
= dns
.rrset
.from_text(name
,
53 response
.answer
.append(rrset
)
55 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
56 receivedQuery
.id = query
.id
57 self
.assertEquals(query
, receivedQuery
)
58 self
.assertEquals(response
, receivedResponse
)
60 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
61 receivedQuery
.id = query
.id
62 self
.assertEquals(query
, receivedQuery
)
63 self
.assertEquals(response
, receivedResponse
)
65 def testSimpleA(self
):
67 Basics: A query without EDNS
69 name
= 'simplea.tests.powerdns.com.'
70 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
71 response
= dns
.message
.make_response(query
)
72 rrset
= dns
.rrset
.from_text(name
,
77 response
.answer
.append(rrset
)
79 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
80 self
.assertTrue(receivedQuery
)
81 self
.assertTrue(receivedResponse
)
82 receivedQuery
.id = query
.id
83 self
.assertEquals(query
, receivedQuery
)
84 self
.assertEquals(response
, receivedResponse
)
86 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
87 self
.assertTrue(receivedQuery
)
88 self
.assertTrue(receivedResponse
)
89 receivedQuery
.id = query
.id
90 self
.assertEquals(query
, receivedQuery
)
91 self
.assertEquals(response
, receivedResponse
)
93 def testAnyIsTruncated(self
):
95 Basics: Truncate ANY query
97 dnsdist is configured to reply with TC to ANY queries,
98 send an ANY query and check the result.
99 It should be truncated over UDP, not over TCP.
101 name
= 'any.tests.powerdns.com.'
102 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
103 expectedResponse
= dns
.message
.make_response(query
)
104 expectedResponse
.flags |
= dns
.flags
.TC
106 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
107 self
.assertEquals(receivedResponse
, expectedResponse
)
109 response
= dns
.message
.make_response(query
)
110 rrset
= dns
.rrset
.from_text(name
,
116 response
.answer
.append(rrset
)
118 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
119 self
.assertTrue(receivedQuery
)
120 self
.assertTrue(receivedResponse
)
121 receivedQuery
.id = query
.id
122 self
.assertEquals(query
, receivedQuery
)
123 self
.assertEquals(receivedResponse
, response
)
125 def testTruncateTC(self
):
129 dnsdist is configured to truncate TC (default),
130 we make the backend send responses
131 with TC set and additional content,
132 and check that the received response has been fixed.
134 name
= 'atruncatetc.tests.powerdns.com.'
135 query
= dns
.message
.make_query(name
, 'A', 'IN')
136 response
= dns
.message
.make_response(query
)
137 rrset
= dns
.rrset
.from_text(name
,
143 response
.answer
.append(rrset
)
144 response
.flags |
= dns
.flags
.TC
145 expectedResponse
= dns
.message
.make_response(query
)
146 expectedResponse
.flags |
= dns
.flags
.TC
148 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
149 receivedQuery
.id = query
.id
150 self
.assertEquals(query
, receivedQuery
)
151 self
.assertEquals(expectedResponse
.flags
, receivedResponse
.flags
)
152 self
.assertEquals(expectedResponse
.question
, receivedResponse
.question
)
153 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
154 self
.assertEquals(len(receivedResponse
.answer
), 0)
155 self
.assertEquals(len(receivedResponse
.authority
), 0)
156 self
.assertEquals(len(receivedResponse
.additional
), 0)
157 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
159 def testTruncateTCEDNS(self
):
161 Basics: Truncate TC with EDNS
163 dnsdist is configured to truncate TC (default),
164 we make the backend send responses
165 with TC set and additional content,
166 and check that the received response has been fixed.
167 Note that the query and initial response had EDNS,
168 so the final response should have it too.
170 name
= 'atruncatetc.tests.powerdns.com.'
171 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
172 response
= dns
.message
.make_response(query
)
173 # force a different responder payload than the one in the query,
174 # so we check that we don't just mirror it
175 response
.payload
= 4242
176 rrset
= dns
.rrset
.from_text(name
,
182 response
.answer
.append(rrset
)
183 response
.flags |
= dns
.flags
.TC
184 expectedResponse
= dns
.message
.make_response(query
)
185 expectedResponse
.payload
= 4242
186 expectedResponse
.flags |
= dns
.flags
.TC
188 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
189 receivedQuery
.id = query
.id
190 self
.assertEquals(query
, receivedQuery
)
191 self
.assertEquals(response
.flags
, receivedResponse
.flags
)
192 self
.assertEquals(response
.question
, receivedResponse
.question
)
193 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
194 self
.assertEquals(len(receivedResponse
.answer
), 0)
195 self
.assertEquals(len(receivedResponse
.authority
), 0)
196 self
.assertEquals(len(receivedResponse
.additional
), 0)
197 print(expectedResponse
)
198 print(receivedResponse
)
199 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
200 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
201 self
.assertEquals(receivedResponse
.payload
, 4242)
203 def testRegexReturnsRefused(self
):
205 Basics: Refuse query matching regex
207 dnsdist is configured to reply 'refused' for query
208 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
209 We send a query for evil4242.powerdns.com
210 and check that the response is "refused".
212 name
= 'evil4242.regex.tests.powerdns.com.'
213 query
= dns
.message
.make_query(name
, 'A', 'IN')
214 expectedResponse
= dns
.message
.make_response(query
)
215 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
217 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
218 self
.assertEquals(receivedResponse
, expectedResponse
)
220 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
221 self
.assertEquals(receivedResponse
, expectedResponse
)
223 def testQNameReturnsSpoofed(self
):
225 Basics: test QNameRule and Spoof
227 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
230 query
= dns
.message
.make_query(name
, 'A', 'IN')
231 query
.flags
&= ~dns
.flags
.RD
232 expectedResponse
= dns
.message
.make_response(query
)
233 expectedResponse
.set_rcode(dns
.rcode
.NOERROR
)
234 rrset
= dns
.rrset
.from_text(name
,
239 expectedResponse
.answer
.append(rrset
)
241 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
242 self
.assertEquals(receivedResponse
, expectedResponse
)
244 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
245 self
.assertEquals(receivedResponse
, expectedResponse
)
248 def testDomainAndQTypeReturnsNotImplemented(self
):
250 Basics: NOTIMPL for specific name and qtype
252 dnsdist is configured to reply 'not implemented' for query
253 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
254 We send a TXT query for "nameAndQtype.powerdns.com."
255 and check that the response is 'not implemented'.
257 name
= 'nameAndQtype.tests.powerdns.com.'
258 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
259 expectedResponse
= dns
.message
.make_response(query
)
260 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
262 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
263 self
.assertEquals(receivedResponse
, expectedResponse
)
265 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
266 self
.assertEquals(receivedResponse
, expectedResponse
)
268 def testDomainWithoutQTypeIsNotAffected(self
):
270 Basics: NOTIMPL qtype canary
272 dnsdist is configured to reply 'not implemented' for query
273 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
274 We send a A query for "nameAndQtype.tests.powerdns.com."
275 and check that the response is OK.
277 name
= 'nameAndQtype.tests.powerdns.com.'
278 query
= dns
.message
.make_query(name
, 'A', 'IN')
279 response
= dns
.message
.make_response(query
)
280 rrset
= dns
.rrset
.from_text(name
,
285 response
.answer
.append(rrset
)
287 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
288 self
.assertTrue(receivedQuery
)
289 self
.assertTrue(receivedResponse
)
290 receivedQuery
.id = query
.id
291 self
.assertEquals(query
, receivedQuery
)
292 self
.assertEquals(response
, receivedResponse
)
294 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
295 self
.assertTrue(receivedQuery
)
296 self
.assertTrue(receivedResponse
)
297 receivedQuery
.id = query
.id
298 self
.assertEquals(query
, receivedQuery
)
299 self
.assertEquals(response
, receivedResponse
)
301 def testOtherDomainANDQTypeIsNotAffected(self
):
303 Basics: NOTIMPL qname canary
305 dnsdist is configured to reply 'not implemented' for query
306 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
307 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
308 and check that the response is OK.
310 name
= 'OtherNameAndQtype.tests.powerdns.com.'
311 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
312 response
= dns
.message
.make_response(query
)
313 rrset
= dns
.rrset
.from_text(name
,
317 'nothing to see here')
318 response
.answer
.append(rrset
)
320 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
321 self
.assertTrue(receivedQuery
)
322 self
.assertTrue(receivedResponse
)
323 receivedQuery
.id = query
.id
324 self
.assertEquals(query
, receivedQuery
)
325 self
.assertEquals(response
, receivedResponse
)
327 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
328 self
.assertTrue(receivedQuery
)
329 self
.assertTrue(receivedResponse
)
330 receivedQuery
.id = query
.id
331 self
.assertEquals(query
, receivedQuery
)
332 self
.assertEquals(response
, receivedResponse
)
334 def testWrongResponse(self
):
336 Basics: Unrelated response from the backend
338 The backend send an unrelated answer over UDP, it should
339 be discarded by dnsdist. It could happen if we wrap around
340 maxOutstanding queries too quickly or have more than maxOutstanding
341 queries to a specific backend in the air over UDP,
342 but does not really make sense over TCP.
344 name
= 'query.unrelated.tests.powerdns.com.'
345 unrelatedName
= 'answer.unrelated.tests.powerdns.com.'
346 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
347 unrelatedQuery
= dns
.message
.make_query(unrelatedName
, 'TXT', 'IN')
348 unrelatedResponse
= dns
.message
.make_response(unrelatedQuery
)
349 rrset
= dns
.rrset
.from_text(unrelatedName
,
353 'nothing to see here')
354 unrelatedResponse
.answer
.append(rrset
)
356 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, unrelatedResponse
)
357 self
.assertTrue(receivedQuery
)
358 self
.assertEquals(receivedResponse
, None)
359 receivedQuery
.id = query
.id
360 self
.assertEquals(query
, receivedQuery
)
362 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, unrelatedResponse
)
363 self
.assertTrue(receivedQuery
)
364 self
.assertEquals(receivedResponse
, None)
365 receivedQuery
.id = query
.id
366 self
.assertEquals(query
, receivedQuery
)
368 def testHeaderOnlyRefused(self
):
370 Basics: Header-only refused response
372 name
= 'header-only-refused-response.tests.powerdns.com.'
373 query
= dns
.message
.make_query(name
, 'A', 'IN')
374 response
= dns
.message
.make_response(query
)
375 response
.set_rcode(dns
.rcode
.REFUSED
)
376 response
.question
= []
378 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
379 self
.assertTrue(receivedQuery
)
380 receivedQuery
.id = query
.id
381 self
.assertEquals(query
, receivedQuery
)
382 self
.assertEquals(receivedResponse
, response
)
384 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
385 self
.assertTrue(receivedQuery
)
386 receivedQuery
.id = query
.id
387 self
.assertEquals(query
, receivedQuery
)
388 self
.assertEquals(receivedResponse
, response
)
390 def testHeaderOnlyNoErrorResponse(self
):
392 Basics: Header-only NoError response should be dropped
394 name
= 'header-only-noerror-response.tests.powerdns.com.'
395 query
= dns
.message
.make_query(name
, 'A', 'IN')
396 response
= dns
.message
.make_response(query
)
397 response
.question
= []
399 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
400 self
.assertTrue(receivedQuery
)
401 receivedQuery
.id = query
.id
402 self
.assertEquals(query
, receivedQuery
)
403 self
.assertEquals(receivedResponse
, None)
405 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
406 self
.assertTrue(receivedQuery
)
407 receivedQuery
.id = query
.id
408 self
.assertEquals(query
, receivedQuery
)
409 self
.assertEquals(receivedResponse
, None)
411 def testHeaderOnlyNXDResponse(self
):
413 Basics: Header-only NXD response should be dropped
415 name
= 'header-only-nxd-response.tests.powerdns.com.'
416 query
= dns
.message
.make_query(name
, 'A', 'IN')
417 response
= dns
.message
.make_response(query
)
418 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
419 response
.question
= []
421 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
422 self
.assertTrue(receivedQuery
)
423 receivedQuery
.id = query
.id
424 self
.assertEquals(query
, receivedQuery
)
425 self
.assertEquals(receivedResponse
, None)
427 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
428 self
.assertTrue(receivedQuery
)
429 receivedQuery
.id = query
.id
430 self
.assertEquals(query
, receivedQuery
)
431 self
.assertEquals(receivedResponse
, None)
433 def testAddActionDNSName(self
):
435 Basics: test if addAction accepts a DNSName
437 name
= 'dnsname.addaction.powerdns.com.'
438 query
= dns
.message
.make_query(name
, 'A', 'IN')
439 expectedResponse
= dns
.message
.make_response(query
)
440 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
442 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
443 self
.assertEquals(receivedResponse
, expectedResponse
)
445 def testAddActionDNSNames(self
):
447 Basics: test if addAction accepts a table of DNSNames
449 for name
in ['dnsname-table{}.addaction.powerdns.com.'.format(i
) for i
in range(1,2)]:
450 query
= dns
.message
.make_query(name
, 'A', 'IN')
451 expectedResponse
= dns
.message
.make_response(query
)
452 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
454 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
455 self
.assertEquals(receivedResponse
, expectedResponse
)
457 if __name__
== '__main__':