]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestBasics(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
12 addAction(AndRule{QTypeRule(dnsdist.ANY), TCPRule(false)}, TCAction())
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
23 def testDropped(self
):
27 Send an A query for drop.test.powerdns.com. domain,
28 which is dropped by configuration. We expect
31 name
= 'drop.test.powerdns.com.'
32 query
= dns
.message
.make_query(name
, 'A', 'IN')
33 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
34 self
.assertEquals(receivedResponse
, None)
36 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
37 self
.assertEquals(receivedResponse
, None)
39 def testAWithECS(self
):
41 Basics: A query with an ECS value
43 name
= 'awithecs.tests.powerdns.com.'
44 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
45 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
])
46 response
= dns
.message
.make_response(query
)
47 rrset
= dns
.rrset
.from_text(name
,
53 response
.answer
.append(rrset
)
55 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
56 receivedQuery
.id = query
.id
57 self
.assertEquals(query
, receivedQuery
)
58 self
.assertEquals(response
, receivedResponse
)
60 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
61 receivedQuery
.id = query
.id
62 self
.assertEquals(query
, receivedQuery
)
63 self
.assertEquals(response
, receivedResponse
)
65 def testSimpleA(self
):
67 Basics: A query without EDNS
69 name
= 'simplea.tests.powerdns.com.'
70 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
71 response
= dns
.message
.make_response(query
)
72 rrset
= dns
.rrset
.from_text(name
,
77 response
.answer
.append(rrset
)
79 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
80 self
.assertTrue(receivedQuery
)
81 self
.assertTrue(receivedResponse
)
82 receivedQuery
.id = query
.id
83 self
.assertEquals(query
, receivedQuery
)
84 self
.assertEquals(response
, receivedResponse
)
86 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
87 self
.assertTrue(receivedQuery
)
88 self
.assertTrue(receivedResponse
)
89 receivedQuery
.id = query
.id
90 self
.assertEquals(query
, receivedQuery
)
91 self
.assertEquals(response
, receivedResponse
)
93 def testAnyIsTruncated(self
):
95 Basics: Truncate ANY query
97 dnsdist is configured to reply with TC to ANY queries,
98 send an ANY query and check the result.
99 It should be truncated over UDP, not over TCP.
101 name
= 'any.tests.powerdns.com.'
102 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
103 expectedResponse
= dns
.message
.make_response(query
)
104 expectedResponse
.flags |
= dns
.flags
.TC
106 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
107 self
.assertEquals(receivedResponse
, expectedResponse
)
109 response
= dns
.message
.make_response(query
)
110 rrset
= dns
.rrset
.from_text(name
,
116 response
.answer
.append(rrset
)
118 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
119 self
.assertTrue(receivedQuery
)
120 self
.assertTrue(receivedResponse
)
121 receivedQuery
.id = query
.id
122 self
.assertEquals(query
, receivedQuery
)
123 self
.assertEquals(receivedResponse
, response
)
125 def testTruncateTC(self
):
129 dnsdist is configured to truncate TC (default),
130 we make the backend send responses
131 with TC set and additional content,
132 and check that the received response has been fixed.
134 name
= 'atruncatetc.tests.powerdns.com.'
135 query
= dns
.message
.make_query(name
, 'A', 'IN')
136 response
= dns
.message
.make_response(query
)
137 rrset
= dns
.rrset
.from_text(name
,
143 response
.answer
.append(rrset
)
144 response
.flags |
= dns
.flags
.TC
146 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
147 receivedQuery
.id = query
.id
148 self
.assertEquals(query
, receivedQuery
)
149 self
.assertEquals(response
.flags
, receivedResponse
.flags
)
150 self
.assertEquals(response
.question
, receivedResponse
.question
)
151 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
152 self
.assertEquals(len(receivedResponse
.answer
), 0)
153 self
.assertEquals(len(receivedResponse
.authority
), 0)
154 self
.assertEquals(len(receivedResponse
.additional
), 0)
156 def testRegexReturnsRefused(self
):
158 Basics: Refuse query matching regex
160 dnsdist is configured to reply 'refused' for query
161 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
162 We send a query for evil4242.powerdns.com
163 and check that the response is "refused".
165 name
= 'evil4242.regex.tests.powerdns.com.'
166 query
= dns
.message
.make_query(name
, 'A', 'IN')
167 expectedResponse
= dns
.message
.make_response(query
)
168 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
170 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
171 self
.assertEquals(receivedResponse
, expectedResponse
)
173 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
174 self
.assertEquals(receivedResponse
, expectedResponse
)
176 def testQNameReturnsSpoofed(self
):
178 Basics: test QNameRule and Spoof
180 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
183 query
= dns
.message
.make_query(name
, 'A', 'IN')
184 query
.flags
&= ~dns
.flags
.RD
185 expectedResponse
= dns
.message
.make_response(query
)
186 expectedResponse
.set_rcode(dns
.rcode
.NOERROR
)
187 rrset
= dns
.rrset
.from_text(name
,
192 expectedResponse
.answer
.append(rrset
)
194 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
195 self
.assertEquals(receivedResponse
, expectedResponse
)
197 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
198 self
.assertEquals(receivedResponse
, expectedResponse
)
201 def testDomainAndQTypeReturnsNotImplemented(self
):
203 Basics: NOTIMPL for specific name and qtype
205 dnsdist is configured to reply 'not implemented' for query
206 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
207 We send a TXT query for "nameAndQtype.powerdns.com."
208 and check that the response is 'not implemented'.
210 name
= 'nameAndQtype.tests.powerdns.com.'
211 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
212 expectedResponse
= dns
.message
.make_response(query
)
213 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
215 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
216 self
.assertEquals(receivedResponse
, expectedResponse
)
218 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
219 self
.assertEquals(receivedResponse
, expectedResponse
)
221 def testDomainWithoutQTypeIsNotAffected(self
):
223 Basics: NOTIMPL qtype canary
225 dnsdist is configured to reply 'not implemented' for query
226 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
227 We send a A query for "nameAndQtype.tests.powerdns.com."
228 and check that the response is OK.
230 name
= 'nameAndQtype.tests.powerdns.com.'
231 query
= dns
.message
.make_query(name
, 'A', 'IN')
232 response
= dns
.message
.make_response(query
)
233 rrset
= dns
.rrset
.from_text(name
,
238 response
.answer
.append(rrset
)
240 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
241 self
.assertTrue(receivedQuery
)
242 self
.assertTrue(receivedResponse
)
243 receivedQuery
.id = query
.id
244 self
.assertEquals(query
, receivedQuery
)
245 self
.assertEquals(response
, receivedResponse
)
247 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
248 self
.assertTrue(receivedQuery
)
249 self
.assertTrue(receivedResponse
)
250 receivedQuery
.id = query
.id
251 self
.assertEquals(query
, receivedQuery
)
252 self
.assertEquals(response
, receivedResponse
)
254 def testOtherDomainANDQTypeIsNotAffected(self
):
256 Basics: NOTIMPL qname canary
258 dnsdist is configured to reply 'not implemented' for query
259 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
260 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
261 and check that the response is OK.
263 name
= 'OtherNameAndQtype.tests.powerdns.com.'
264 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
265 response
= dns
.message
.make_response(query
)
266 rrset
= dns
.rrset
.from_text(name
,
270 'nothing to see here')
271 response
.answer
.append(rrset
)
273 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
274 self
.assertTrue(receivedQuery
)
275 self
.assertTrue(receivedResponse
)
276 receivedQuery
.id = query
.id
277 self
.assertEquals(query
, receivedQuery
)
278 self
.assertEquals(response
, receivedResponse
)
280 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
281 self
.assertTrue(receivedQuery
)
282 self
.assertTrue(receivedResponse
)
283 receivedQuery
.id = query
.id
284 self
.assertEquals(query
, receivedQuery
)
285 self
.assertEquals(response
, receivedResponse
)
287 def testWrongResponse(self
):
289 Basics: Unrelated response from the backend
291 The backend send an unrelated answer over UDP, it should
292 be discarded by dnsdist. It could happen if we wrap around
293 maxOutstanding queries too quickly or have more than maxOutstanding
294 queries to a specific backend in the air over UDP,
295 but does not really make sense over TCP.
297 name
= 'query.unrelated.tests.powerdns.com.'
298 unrelatedName
= 'answer.unrelated.tests.powerdns.com.'
299 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
300 unrelatedQuery
= dns
.message
.make_query(unrelatedName
, 'TXT', 'IN')
301 unrelatedResponse
= dns
.message
.make_response(unrelatedQuery
)
302 rrset
= dns
.rrset
.from_text(unrelatedName
,
306 'nothing to see here')
307 unrelatedResponse
.answer
.append(rrset
)
309 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, unrelatedResponse
)
310 self
.assertTrue(receivedQuery
)
311 self
.assertEquals(receivedResponse
, None)
312 receivedQuery
.id = query
.id
313 self
.assertEquals(query
, receivedQuery
)
315 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, unrelatedResponse
)
316 self
.assertTrue(receivedQuery
)
317 self
.assertEquals(receivedResponse
, None)
318 receivedQuery
.id = query
.id
319 self
.assertEquals(query
, receivedQuery
)
321 def testHeaderOnlyRefused(self
):
323 Basics: Header-only refused response
325 name
= 'header-only-refused-response.tests.powerdns.com.'
326 query
= dns
.message
.make_query(name
, 'A', 'IN')
327 response
= dns
.message
.make_response(query
)
328 response
.set_rcode(dns
.rcode
.REFUSED
)
329 response
.question
= []
331 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
332 self
.assertTrue(receivedQuery
)
333 receivedQuery
.id = query
.id
334 self
.assertEquals(query
, receivedQuery
)
335 self
.assertEquals(receivedResponse
, response
)
337 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
338 self
.assertTrue(receivedQuery
)
339 receivedQuery
.id = query
.id
340 self
.assertEquals(query
, receivedQuery
)
341 self
.assertEquals(receivedResponse
, response
)
343 def testHeaderOnlyNoErrorResponse(self
):
345 Basics: Header-only NoError response should be dropped
347 name
= 'header-only-noerror-response.tests.powerdns.com.'
348 query
= dns
.message
.make_query(name
, 'A', 'IN')
349 response
= dns
.message
.make_response(query
)
350 response
.question
= []
352 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
353 self
.assertTrue(receivedQuery
)
354 receivedQuery
.id = query
.id
355 self
.assertEquals(query
, receivedQuery
)
356 self
.assertEquals(receivedResponse
, None)
358 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
359 self
.assertTrue(receivedQuery
)
360 receivedQuery
.id = query
.id
361 self
.assertEquals(query
, receivedQuery
)
362 self
.assertEquals(receivedResponse
, None)
364 def testHeaderOnlyNXDResponse(self
):
366 Basics: Header-only NXD response should be dropped
368 name
= 'header-only-nxd-response.tests.powerdns.com.'
369 query
= dns
.message
.make_query(name
, 'A', 'IN')
370 response
= dns
.message
.make_response(query
)
371 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
372 response
.question
= []
374 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
375 self
.assertTrue(receivedQuery
)
376 receivedQuery
.id = query
.id
377 self
.assertEquals(query
, receivedQuery
)
378 self
.assertEquals(receivedResponse
, None)
380 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
381 self
.assertTrue(receivedQuery
)
382 receivedQuery
.id = query
.id
383 self
.assertEquals(query
, receivedQuery
)
384 self
.assertEquals(receivedResponse
, None)
386 def testAddActionDNSName(self
):
388 Basics: test if addAction accepts a DNSName
390 name
= 'dnsname.addaction.powerdns.com.'
391 query
= dns
.message
.make_query(name
, 'A', 'IN')
392 expectedResponse
= dns
.message
.make_response(query
)
393 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
395 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
396 self
.assertEquals(receivedResponse
, expectedResponse
)
398 def testAddActionDNSNames(self
):
400 Basics: test if addAction accepts a table of DNSNames
402 for name
in ['dnsname-table{}.addaction.powerdns.com.'.format(i
) for i
in range(1,2)]:
403 query
= dns
.message
.make_query(name
, 'A', 'IN')
404 expectedResponse
= dns
.message
.make_response(query
)
405 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
407 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
408 self
.assertEquals(receivedResponse
, expectedResponse
)
410 if __name__
== '__main__':