]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestBasics(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
21 block=newDNSName("powerdns.org.")
22 function blockFilter(dq)
23 if(dq.qname:isPartOf(block))
25 print("Blocking *.powerdns.org")
32 def testDropped(self
):
36 Send an A query for drop.test.powerdns.com. domain,
37 which is dropped by configuration. We expect
40 name
= 'drop.test.powerdns.com.'
41 query
= dns
.message
.make_query(name
, 'A', 'IN')
42 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
43 self
.assertEquals(receivedResponse
, None)
45 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
46 self
.assertEquals(receivedResponse
, None)
48 def testBlockedA(self
):
50 Basics: Blocked A query
52 Send an A query for the powerdns.org domain,
53 which is blocked by configuration. We expect
56 name
= 'blockeda.tests.powerdns.org.'
57 query
= dns
.message
.make_query(name
, 'A', 'IN')
58 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
59 self
.assertEquals(receivedResponse
, None)
61 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
62 self
.assertEquals(receivedResponse
, None)
64 def testAWithECS(self
):
66 Basics: A query with an ECS value
68 name
= 'awithecs.tests.powerdns.com.'
69 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
70 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
])
71 response
= dns
.message
.make_response(query
)
72 rrset
= dns
.rrset
.from_text(name
,
78 response
.answer
.append(rrset
)
80 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
81 receivedQuery
.id = query
.id
82 self
.assertEquals(query
, receivedQuery
)
83 self
.assertEquals(response
, receivedResponse
)
85 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
86 receivedQuery
.id = query
.id
87 self
.assertEquals(query
, receivedQuery
)
88 self
.assertEquals(response
, receivedResponse
)
90 def testSimpleA(self
):
92 Basics: A query without EDNS
94 name
= 'simplea.tests.powerdns.com.'
95 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
96 response
= dns
.message
.make_response(query
)
97 rrset
= dns
.rrset
.from_text(name
,
102 response
.answer
.append(rrset
)
104 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
105 self
.assertTrue(receivedQuery
)
106 self
.assertTrue(receivedResponse
)
107 receivedQuery
.id = query
.id
108 self
.assertEquals(query
, receivedQuery
)
109 self
.assertEquals(response
, receivedResponse
)
111 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
112 self
.assertTrue(receivedQuery
)
113 self
.assertTrue(receivedResponse
)
114 receivedQuery
.id = query
.id
115 self
.assertEquals(query
, receivedQuery
)
116 self
.assertEquals(response
, receivedResponse
)
118 def testAnyIsTruncated(self
):
120 Basics: Truncate ANY query
122 dnsdist is configured to reply with TC to ANY queries,
123 send an ANY query and check the result.
124 It should be truncated over UDP, not over TCP.
126 name
= 'any.tests.powerdns.com.'
127 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
128 expectedResponse
= dns
.message
.make_response(query
)
129 expectedResponse
.flags |
= dns
.flags
.TC
131 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
132 self
.assertEquals(receivedResponse
, expectedResponse
)
134 response
= dns
.message
.make_response(query
)
135 rrset
= dns
.rrset
.from_text(name
,
141 response
.answer
.append(rrset
)
143 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
144 self
.assertTrue(receivedQuery
)
145 self
.assertTrue(receivedResponse
)
146 receivedQuery
.id = query
.id
147 self
.assertEquals(query
, receivedQuery
)
148 self
.assertEquals(receivedResponse
, response
)
150 def testTruncateTC(self
):
154 dnsdist is configured to truncate TC (default),
155 we make the backend send responses
156 with TC set and additional content,
157 and check that the received response has been fixed.
159 name
= 'atruncatetc.tests.powerdns.com.'
160 query
= dns
.message
.make_query(name
, 'A', 'IN')
161 response
= dns
.message
.make_response(query
)
162 rrset
= dns
.rrset
.from_text(name
,
168 response
.answer
.append(rrset
)
169 response
.flags |
= dns
.flags
.TC
171 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
172 receivedQuery
.id = query
.id
173 self
.assertEquals(query
, receivedQuery
)
174 self
.assertEquals(response
.flags
, receivedResponse
.flags
)
175 self
.assertEquals(response
.question
, receivedResponse
.question
)
176 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
177 self
.assertEquals(len(receivedResponse
.answer
), 0)
178 self
.assertEquals(len(receivedResponse
.authority
), 0)
179 self
.assertEquals(len(receivedResponse
.additional
), 0)
181 def testRegexReturnsRefused(self
):
183 Basics: Refuse query matching regex
185 dnsdist is configured to reply 'refused' for query
186 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
187 We send a query for evil4242.powerdns.com
188 and check that the response is "refused".
190 name
= 'evil4242.regex.tests.powerdns.com.'
191 query
= dns
.message
.make_query(name
, 'A', 'IN')
192 expectedResponse
= dns
.message
.make_response(query
)
193 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
195 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
196 self
.assertEquals(receivedResponse
, expectedResponse
)
198 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
199 self
.assertEquals(receivedResponse
, expectedResponse
)
201 def testQNameReturnsSpoofed(self
):
203 Basics: test QNameRule and Spoof
205 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
208 query
= dns
.message
.make_query(name
, 'A', 'IN')
209 query
.flags
&= ~dns
.flags
.RD
210 expectedResponse
= dns
.message
.make_response(query
)
211 expectedResponse
.set_rcode(dns
.rcode
.NOERROR
)
212 rrset
= dns
.rrset
.from_text(name
,
217 expectedResponse
.answer
.append(rrset
)
219 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
220 self
.assertEquals(receivedResponse
, expectedResponse
)
222 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
223 self
.assertEquals(receivedResponse
, expectedResponse
)
226 def testDomainAndQTypeReturnsNotImplemented(self
):
228 Basics: NOTIMPL for specific name and qtype
230 dnsdist is configured to reply 'not implemented' for query
231 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
232 We send a TXT query for "nameAndQtype.powerdns.com."
233 and check that the response is 'not implemented'.
235 name
= 'nameAndQtype.tests.powerdns.com.'
236 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
237 expectedResponse
= dns
.message
.make_response(query
)
238 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
240 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
241 self
.assertEquals(receivedResponse
, expectedResponse
)
243 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
244 self
.assertEquals(receivedResponse
, expectedResponse
)
246 def testDomainWithoutQTypeIsNotAffected(self
):
248 Basics: NOTIMPL qtype canary
250 dnsdist is configured to reply 'not implemented' for query
251 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
252 We send a A query for "nameAndQtype.tests.powerdns.com."
253 and check that the response is OK.
255 name
= 'nameAndQtype.tests.powerdns.com.'
256 query
= dns
.message
.make_query(name
, 'A', 'IN')
257 response
= dns
.message
.make_response(query
)
258 rrset
= dns
.rrset
.from_text(name
,
263 response
.answer
.append(rrset
)
265 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
266 self
.assertTrue(receivedQuery
)
267 self
.assertTrue(receivedResponse
)
268 receivedQuery
.id = query
.id
269 self
.assertEquals(query
, receivedQuery
)
270 self
.assertEquals(response
, receivedResponse
)
272 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
273 self
.assertTrue(receivedQuery
)
274 self
.assertTrue(receivedResponse
)
275 receivedQuery
.id = query
.id
276 self
.assertEquals(query
, receivedQuery
)
277 self
.assertEquals(response
, receivedResponse
)
279 def testOtherDomainANDQTypeIsNotAffected(self
):
281 Basics: NOTIMPL qname canary
283 dnsdist is configured to reply 'not implemented' for query
284 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
285 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
286 and check that the response is OK.
288 name
= 'OtherNameAndQtype.tests.powerdns.com.'
289 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
290 response
= dns
.message
.make_response(query
)
291 rrset
= dns
.rrset
.from_text(name
,
295 'nothing to see here')
296 response
.answer
.append(rrset
)
298 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
299 self
.assertTrue(receivedQuery
)
300 self
.assertTrue(receivedResponse
)
301 receivedQuery
.id = query
.id
302 self
.assertEquals(query
, receivedQuery
)
303 self
.assertEquals(response
, receivedResponse
)
305 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
306 self
.assertTrue(receivedQuery
)
307 self
.assertTrue(receivedResponse
)
308 receivedQuery
.id = query
.id
309 self
.assertEquals(query
, receivedQuery
)
310 self
.assertEquals(response
, receivedResponse
)
312 def testWrongResponse(self
):
314 Basics: Unrelated response from the backend
316 The backend send an unrelated answer over UDP, it should
317 be discarded by dnsdist. It could happen if we wrap around
318 maxOutstanding queries too quickly or have more than maxOutstanding
319 queries to a specific backend in the air over UDP,
320 but does not really make sense over TCP.
322 name
= 'query.unrelated.tests.powerdns.com.'
323 unrelatedName
= 'answer.unrelated.tests.powerdns.com.'
324 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
325 unrelatedQuery
= dns
.message
.make_query(unrelatedName
, 'TXT', 'IN')
326 unrelatedResponse
= dns
.message
.make_response(unrelatedQuery
)
327 rrset
= dns
.rrset
.from_text(unrelatedName
,
331 'nothing to see here')
332 unrelatedResponse
.answer
.append(rrset
)
334 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, unrelatedResponse
)
335 self
.assertTrue(receivedQuery
)
336 self
.assertEquals(receivedResponse
, None)
337 receivedQuery
.id = query
.id
338 self
.assertEquals(query
, receivedQuery
)
340 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, unrelatedResponse
)
341 self
.assertTrue(receivedQuery
)
342 self
.assertEquals(receivedResponse
, None)
343 receivedQuery
.id = query
.id
344 self
.assertEquals(query
, receivedQuery
)
346 def testHeaderOnlyRefused(self
):
348 Basics: Header-only refused response
350 name
= 'header-only-refused-response.tests.powerdns.com.'
351 query
= dns
.message
.make_query(name
, 'A', 'IN')
352 response
= dns
.message
.make_response(query
)
353 response
.set_rcode(dns
.rcode
.REFUSED
)
354 response
.question
= []
356 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
357 self
.assertTrue(receivedQuery
)
358 receivedQuery
.id = query
.id
359 self
.assertEquals(query
, receivedQuery
)
360 self
.assertEquals(receivedResponse
, response
)
362 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
363 self
.assertTrue(receivedQuery
)
364 receivedQuery
.id = query
.id
365 self
.assertEquals(query
, receivedQuery
)
366 self
.assertEquals(receivedResponse
, response
)
368 def testHeaderOnlyNoErrorResponse(self
):
370 Basics: Header-only NoError response should be dropped
372 name
= 'header-only-noerror-response.tests.powerdns.com.'
373 query
= dns
.message
.make_query(name
, 'A', 'IN')
374 response
= dns
.message
.make_response(query
)
375 response
.question
= []
377 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
378 self
.assertTrue(receivedQuery
)
379 receivedQuery
.id = query
.id
380 self
.assertEquals(query
, receivedQuery
)
381 self
.assertEquals(receivedResponse
, None)
383 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
384 self
.assertTrue(receivedQuery
)
385 receivedQuery
.id = query
.id
386 self
.assertEquals(query
, receivedQuery
)
387 self
.assertEquals(receivedResponse
, None)
389 def testHeaderOnlyNXDResponse(self
):
391 Basics: Header-only NXD response should be dropped
393 name
= 'header-only-nxd-response.tests.powerdns.com.'
394 query
= dns
.message
.make_query(name
, 'A', 'IN')
395 response
= dns
.message
.make_response(query
)
396 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
397 response
.question
= []
399 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
400 self
.assertTrue(receivedQuery
)
401 receivedQuery
.id = query
.id
402 self
.assertEquals(query
, receivedQuery
)
403 self
.assertEquals(receivedResponse
, None)
405 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
406 self
.assertTrue(receivedQuery
)
407 receivedQuery
.id = query
.id
408 self
.assertEquals(query
, receivedQuery
)
409 self
.assertEquals(receivedResponse
, None)
411 def testAddActionDNSName(self
):
413 Basics: test if addAction accepts a DNSName
415 name
= 'dnsname.addaction.powerdns.com.'
416 query
= dns
.message
.make_query(name
, 'A', 'IN')
417 expectedResponse
= dns
.message
.make_response(query
)
418 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
420 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
421 self
.assertEquals(receivedResponse
, expectedResponse
)
423 def testAddActionDNSNames(self
):
425 Basics: test if addAction accepts a table of DNSNames
427 for name
in ['dnsname-table{}.addaction.powerdns.com.'.format(i
) for i
in range(1,2)]:
428 query
= dns
.message
.make_query(name
, 'A', 'IN')
429 expectedResponse
= dns
.message
.make_response(query
)
430 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
432 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
433 self
.assertEquals(receivedResponse
, expectedResponse
)
435 if __name__
== '__main__':