]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestBasics(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
19 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
20 block=newDNSName("powerdns.org.")
21 function blockFilter(dq)
22 if(dq.qname:isPartOf(block))
24 print("Blocking *.powerdns.org")
31 def testDropped(self
):
35 Send an A query for drop.test.powerdns.com. domain,
36 which is dropped by configuration. We expect
39 name
= 'drop.test.powerdns.com.'
40 query
= dns
.message
.make_query(name
, 'A', 'IN')
41 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
42 self
.assertEquals(receivedResponse
, None)
44 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
45 self
.assertEquals(receivedResponse
, None)
47 def testBlockedA(self
):
49 Basics: Blocked A query
51 Send an A query for the powerdns.org domain,
52 which is blocked by configuration. We expect
55 name
= 'blockeda.tests.powerdns.org.'
56 query
= dns
.message
.make_query(name
, 'A', 'IN')
57 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
58 self
.assertEquals(receivedResponse
, None)
60 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
61 self
.assertEquals(receivedResponse
, None)
63 def testAWithECS(self
):
65 Basics: A query with an ECS value
67 name
= 'awithecs.tests.powerdns.com.'
68 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
69 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
])
70 response
= dns
.message
.make_response(query
)
71 rrset
= dns
.rrset
.from_text(name
,
77 response
.answer
.append(rrset
)
79 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
80 receivedQuery
.id = query
.id
81 self
.assertEquals(query
, receivedQuery
)
82 self
.assertEquals(response
, receivedResponse
)
84 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
85 receivedQuery
.id = query
.id
86 self
.assertEquals(query
, receivedQuery
)
87 self
.assertEquals(response
, receivedResponse
)
89 def testSimpleA(self
):
91 Basics: A query without EDNS
93 name
= 'simplea.tests.powerdns.com.'
94 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
95 response
= dns
.message
.make_response(query
)
96 rrset
= dns
.rrset
.from_text(name
,
101 response
.answer
.append(rrset
)
103 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
104 self
.assertTrue(receivedQuery
)
105 self
.assertTrue(receivedResponse
)
106 receivedQuery
.id = query
.id
107 self
.assertEquals(query
, receivedQuery
)
108 self
.assertEquals(response
, receivedResponse
)
110 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
111 self
.assertTrue(receivedQuery
)
112 self
.assertTrue(receivedResponse
)
113 receivedQuery
.id = query
.id
114 self
.assertEquals(query
, receivedQuery
)
115 self
.assertEquals(response
, receivedResponse
)
117 def testAnyIsTruncated(self
):
119 Basics: Truncate ANY query
121 dnsdist is configured to reply with TC to ANY queries,
122 send an ANY query and check the result.
123 It should be truncated over UDP, not over TCP.
125 name
= 'any.tests.powerdns.com.'
126 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
127 expectedResponse
= dns
.message
.make_response(query
)
128 expectedResponse
.flags |
= dns
.flags
.TC
130 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
131 self
.assertEquals(receivedResponse
, expectedResponse
)
133 response
= dns
.message
.make_response(query
)
134 rrset
= dns
.rrset
.from_text(name
,
140 response
.answer
.append(rrset
)
142 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
143 self
.assertTrue(receivedQuery
)
144 self
.assertTrue(receivedResponse
)
145 receivedQuery
.id = query
.id
146 self
.assertEquals(query
, receivedQuery
)
147 self
.assertEquals(receivedResponse
, response
)
149 def testTruncateTC(self
):
153 dnsdist is configured to truncate TC (default),
154 we make the backend send responses
155 with TC set and additional content,
156 and check that the received response has been fixed.
158 name
= 'atruncatetc.tests.powerdns.com.'
159 query
= dns
.message
.make_query(name
, 'A', 'IN')
160 response
= dns
.message
.make_response(query
)
161 rrset
= dns
.rrset
.from_text(name
,
167 response
.answer
.append(rrset
)
168 response
.flags |
= dns
.flags
.TC
170 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
171 receivedQuery
.id = query
.id
172 self
.assertEquals(query
, receivedQuery
)
173 self
.assertEquals(response
.flags
, receivedResponse
.flags
)
174 self
.assertEquals(response
.question
, receivedResponse
.question
)
175 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
176 self
.assertEquals(len(receivedResponse
.answer
), 0)
177 self
.assertEquals(len(receivedResponse
.authority
), 0)
178 self
.assertEquals(len(receivedResponse
.additional
), 0)
180 def testRegexReturnsRefused(self
):
182 Basics: Refuse query matching regex
184 dnsdist is configured to reply 'refused' for query
185 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
186 We send a query for evil4242.powerdns.com
187 and check that the response is "refused".
189 name
= 'evil4242.regex.tests.powerdns.com.'
190 query
= dns
.message
.make_query(name
, 'A', 'IN')
191 expectedResponse
= dns
.message
.make_response(query
)
192 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
194 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
195 self
.assertEquals(receivedResponse
, expectedResponse
)
197 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
198 self
.assertEquals(receivedResponse
, expectedResponse
)
200 def testDomainAndQTypeReturnsNotImplemented(self
):
202 Basics: NOTIMPL for specific name and qtype
204 dnsdist is configured to reply 'not implemented' for query
205 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
206 We send a TXT query for "nameAndQtype.powerdns.com."
207 and check that the response is 'not implemented'.
209 name
= 'nameAndQtype.tests.powerdns.com.'
210 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
211 expectedResponse
= dns
.message
.make_response(query
)
212 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
214 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
215 self
.assertEquals(receivedResponse
, expectedResponse
)
217 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
218 self
.assertEquals(receivedResponse
, expectedResponse
)
220 def testDomainWithoutQTypeIsNotAffected(self
):
222 Basics: NOTIMPL qtype canary
224 dnsdist is configured to reply 'not implemented' for query
225 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
226 We send a A query for "nameAndQtype.tests.powerdns.com."
227 and check that the response is OK.
229 name
= 'nameAndQtype.tests.powerdns.com.'
230 query
= dns
.message
.make_query(name
, 'A', 'IN')
231 response
= dns
.message
.make_response(query
)
232 rrset
= dns
.rrset
.from_text(name
,
237 response
.answer
.append(rrset
)
239 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
240 self
.assertTrue(receivedQuery
)
241 self
.assertTrue(receivedResponse
)
242 receivedQuery
.id = query
.id
243 self
.assertEquals(query
, receivedQuery
)
244 self
.assertEquals(response
, receivedResponse
)
246 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
247 self
.assertTrue(receivedQuery
)
248 self
.assertTrue(receivedResponse
)
249 receivedQuery
.id = query
.id
250 self
.assertEquals(query
, receivedQuery
)
251 self
.assertEquals(response
, receivedResponse
)
253 def testOtherDomainANDQTypeIsNotAffected(self
):
255 Basics: NOTIMPL qname canary
257 dnsdist is configured to reply 'not implemented' for query
258 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
259 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
260 and check that the response is OK.
262 name
= 'OtherNameAndQtype.tests.powerdns.com.'
263 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
264 response
= dns
.message
.make_response(query
)
265 rrset
= dns
.rrset
.from_text(name
,
269 'nothing to see here')
270 response
.answer
.append(rrset
)
272 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
273 self
.assertTrue(receivedQuery
)
274 self
.assertTrue(receivedResponse
)
275 receivedQuery
.id = query
.id
276 self
.assertEquals(query
, receivedQuery
)
277 self
.assertEquals(response
, receivedResponse
)
279 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
280 self
.assertTrue(receivedQuery
)
281 self
.assertTrue(receivedResponse
)
282 receivedQuery
.id = query
.id
283 self
.assertEquals(query
, receivedQuery
)
284 self
.assertEquals(response
, receivedResponse
)
286 def testWrongResponse(self
):
288 Basics: Unrelated response from the backend
290 The backend send an unrelated answer over UDP, it should
291 be discarded by dnsdist. It could happen if we wrap around
292 maxOutstanding queries too quickly or have more than maxOutstanding
293 queries to a specific backend in the air over UDP,
294 but does not really make sense over TCP.
296 name
= 'query.unrelated.tests.powerdns.com.'
297 unrelatedName
= 'answer.unrelated.tests.powerdns.com.'
298 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
299 unrelatedQuery
= dns
.message
.make_query(unrelatedName
, 'TXT', 'IN')
300 unrelatedResponse
= dns
.message
.make_response(unrelatedQuery
)
301 rrset
= dns
.rrset
.from_text(unrelatedName
,
305 'nothing to see here')
306 unrelatedResponse
.answer
.append(rrset
)
308 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, unrelatedResponse
)
309 self
.assertTrue(receivedQuery
)
310 self
.assertEquals(receivedResponse
, None)
311 receivedQuery
.id = query
.id
312 self
.assertEquals(query
, receivedQuery
)
314 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, unrelatedResponse
)
315 self
.assertTrue(receivedQuery
)
316 self
.assertEquals(receivedResponse
, None)
317 receivedQuery
.id = query
.id
318 self
.assertEquals(query
, receivedQuery
)
320 def testHeaderOnlyRefused(self
):
322 Basics: Header-only refused response
324 name
= 'header-only-refused-response.tests.powerdns.com.'
325 query
= dns
.message
.make_query(name
, 'A', 'IN')
326 response
= dns
.message
.make_response(query
)
327 response
.set_rcode(dns
.rcode
.REFUSED
)
328 response
.question
= []
330 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
331 self
.assertTrue(receivedQuery
)
332 receivedQuery
.id = query
.id
333 self
.assertEquals(query
, receivedQuery
)
334 self
.assertEquals(receivedResponse
, response
)
336 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
337 self
.assertTrue(receivedQuery
)
338 receivedQuery
.id = query
.id
339 self
.assertEquals(query
, receivedQuery
)
340 self
.assertEquals(receivedResponse
, response
)
342 def testHeaderOnlyNoErrorResponse(self
):
344 Basics: Header-only NoError response should be dropped
346 name
= 'header-only-noerror-response.tests.powerdns.com.'
347 query
= dns
.message
.make_query(name
, 'A', 'IN')
348 response
= dns
.message
.make_response(query
)
349 response
.question
= []
351 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
352 self
.assertTrue(receivedQuery
)
353 receivedQuery
.id = query
.id
354 self
.assertEquals(query
, receivedQuery
)
355 self
.assertEquals(receivedResponse
, None)
357 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
358 self
.assertTrue(receivedQuery
)
359 receivedQuery
.id = query
.id
360 self
.assertEquals(query
, receivedQuery
)
361 self
.assertEquals(receivedResponse
, None)
363 def testHeaderOnlyNXDResponse(self
):
365 Basics: Header-only NXD response should be dropped
367 name
= 'header-only-nxd-response.tests.powerdns.com.'
368 query
= dns
.message
.make_query(name
, 'A', 'IN')
369 response
= dns
.message
.make_response(query
)
370 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
371 response
.question
= []
373 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
374 self
.assertTrue(receivedQuery
)
375 receivedQuery
.id = query
.id
376 self
.assertEquals(query
, receivedQuery
)
377 self
.assertEquals(receivedResponse
, None)
379 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
380 self
.assertTrue(receivedQuery
)
381 receivedQuery
.id = query
.id
382 self
.assertEquals(query
, receivedQuery
)
383 self
.assertEquals(receivedResponse
, None)
385 def testAddActionDNSName(self
):
387 Basics: test if addAction accepts a DNSName
389 name
= 'dnsname.addaction.powerdns.com.'
390 query
= dns
.message
.make_query(name
, 'A', 'IN')
391 expectedResponse
= dns
.message
.make_response(query
)
392 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
394 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
395 self
.assertEquals(receivedResponse
, expectedResponse
)
397 def testAddActionDNSNames(self
):
399 Basics: test if addAction accepts a table of DNSNames
401 for name
in ['dnsname-table{}.addaction.powerdns.com.'.format(i
) for i
in range(1,2)]:
402 query
= dns
.message
.make_query(name
, 'A', 'IN')
403 expectedResponse
= dns
.message
.make_response(query
)
404 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
406 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
407 self
.assertEquals(receivedResponse
, expectedResponse
)
409 if __name__
== '__main__':