]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
3 import clientsubnetoption
4 from dnsdisttests
import DNSDistTest
6 class TestBasics(DNSDistTest
):
9 newServer{address="127.0.0.1:%s"}
11 addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
12 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
13 mySMN = newSuffixMatchNode()
14 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
15 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
16 addAction(makeRule("drop.test.powerdns.com."), DropAction())
17 addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
18 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
19 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
22 def testDropped(self
):
26 Send an A query for drop.test.powerdns.com. domain,
27 which is dropped by configuration. We expect
30 name
= 'drop.test.powerdns.com.'
31 query
= dns
.message
.make_query(name
, 'A', 'IN')
32 for method
in ("sendUDPQuery", "sendTCPQuery"):
33 sender
= getattr(self
, method
)
34 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
35 self
.assertEquals(receivedResponse
, None)
37 def testAWithECS(self
):
39 Basics: A query with an ECS value
41 name
= 'awithecs.tests.powerdns.com.'
42 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
43 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
])
44 response
= dns
.message
.make_response(query
)
45 rrset
= dns
.rrset
.from_text(name
,
51 response
.answer
.append(rrset
)
53 for method
in ("sendUDPQuery", "sendTCPQuery"):
54 sender
= getattr(self
, method
)
55 (receivedQuery
, receivedResponse
) = sender(query
, response
)
56 receivedQuery
.id = query
.id
57 self
.assertEquals(query
, receivedQuery
)
58 self
.assertEquals(response
, receivedResponse
)
60 def testSimpleA(self
):
62 Basics: A query without EDNS
64 name
= 'simplea.tests.powerdns.com.'
65 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
66 response
= dns
.message
.make_response(query
)
67 rrset
= dns
.rrset
.from_text(name
,
72 response
.answer
.append(rrset
)
74 for method
in ("sendUDPQuery", "sendTCPQuery"):
75 sender
= getattr(self
, method
)
76 (receivedQuery
, receivedResponse
) = sender(query
, response
)
77 self
.assertTrue(receivedQuery
)
78 self
.assertTrue(receivedResponse
)
79 receivedQuery
.id = query
.id
80 self
.assertEquals(query
, receivedQuery
)
81 self
.assertEquals(response
, receivedResponse
)
83 def testAnyIsTruncated(self
):
85 Basics: Truncate ANY query
87 dnsdist is configured to reply with TC to ANY queries,
88 send an ANY query and check the result.
89 It should be truncated over UDP, not over TCP.
91 name
= 'any.tests.powerdns.com.'
92 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
93 # dnsdist sets RA = RD for TC responses
94 query
.flags
&= ~dns
.flags
.RD
95 expectedResponse
= dns
.message
.make_response(query
)
96 expectedResponse
.flags |
= dns
.flags
.TC
98 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
99 self
.assertEquals(receivedResponse
, expectedResponse
)
101 response
= dns
.message
.make_response(query
)
102 rrset
= dns
.rrset
.from_text(name
,
108 response
.answer
.append(rrset
)
110 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
111 self
.assertTrue(receivedQuery
)
112 self
.assertTrue(receivedResponse
)
113 receivedQuery
.id = query
.id
114 self
.assertEquals(query
, receivedQuery
)
115 self
.assertEquals(receivedResponse
, response
)
117 def testTruncateTC(self
):
121 dnsdist is configured to truncate TC (default),
122 we make the backend send responses
123 with TC set and additional content,
124 and check that the received response has been fixed.
126 name
= 'atruncatetc.tests.powerdns.com.'
127 query
= dns
.message
.make_query(name
, 'A', 'IN')
128 response
= dns
.message
.make_response(query
)
129 rrset
= dns
.rrset
.from_text(name
,
135 response
.answer
.append(rrset
)
136 response
.flags |
= dns
.flags
.TC
137 expectedResponse
= dns
.message
.make_response(query
)
138 expectedResponse
.flags |
= dns
.flags
.TC
140 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
141 receivedQuery
.id = query
.id
142 self
.assertEquals(query
, receivedQuery
)
143 self
.assertEquals(expectedResponse
.flags
, receivedResponse
.flags
)
144 self
.assertEquals(expectedResponse
.question
, receivedResponse
.question
)
145 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
146 self
.assertEquals(len(receivedResponse
.answer
), 0)
147 self
.assertEquals(len(receivedResponse
.authority
), 0)
148 self
.assertEquals(len(receivedResponse
.additional
), 0)
149 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
151 def testTruncateTCEDNS(self
):
153 Basics: Truncate TC with EDNS
155 dnsdist is configured to truncate TC (default),
156 we make the backend send responses
157 with TC set and additional content,
158 and check that the received response has been fixed.
159 Note that the query and initial response had EDNS,
160 so the final response should have it too.
162 name
= 'atruncatetc.tests.powerdns.com.'
163 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
164 response
= dns
.message
.make_response(query
)
165 # force a different responder payload than the one in the query,
166 # so we check that we don't just mirror it
167 response
.payload
= 4242
168 rrset
= dns
.rrset
.from_text(name
,
174 response
.answer
.append(rrset
)
175 response
.flags |
= dns
.flags
.TC
176 expectedResponse
= dns
.message
.make_response(query
)
177 expectedResponse
.payload
= 4242
178 expectedResponse
.flags |
= dns
.flags
.TC
180 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
181 receivedQuery
.id = query
.id
182 self
.assertEquals(query
, receivedQuery
)
183 self
.assertEquals(response
.flags
, receivedResponse
.flags
)
184 self
.assertEquals(response
.question
, receivedResponse
.question
)
185 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
186 self
.assertEquals(len(receivedResponse
.answer
), 0)
187 self
.assertEquals(len(receivedResponse
.authority
), 0)
188 self
.assertEquals(len(receivedResponse
.additional
), 0)
189 print(expectedResponse
)
190 print(receivedResponse
)
191 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
192 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
193 self
.assertEquals(receivedResponse
.payload
, 4242)
195 def testRegexReturnsRefused(self
):
197 Basics: Refuse query matching regex
199 dnsdist is configured to reply 'refused' for query
200 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
201 We send a query for evil4242.powerdns.com
202 and check that the response is "refused".
204 name
= 'evil4242.regex.tests.powerdns.com.'
205 query
= dns
.message
.make_query(name
, 'A', 'IN')
206 query
.flags
&= ~dns
.flags
.RD
207 expectedResponse
= dns
.message
.make_response(query
)
208 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
210 for method
in ("sendUDPQuery", "sendTCPQuery"):
211 sender
= getattr(self
, method
)
212 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
213 self
.assertEquals(receivedResponse
, expectedResponse
)
215 def testQNameReturnsSpoofed(self
):
217 Basics: test QNameRule and Spoof
219 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
222 query
= dns
.message
.make_query(name
, 'A', 'IN')
223 query
.flags
&= ~dns
.flags
.RD
224 expectedResponse
= dns
.message
.make_response(query
)
225 expectedResponse
.set_rcode(dns
.rcode
.NOERROR
)
226 rrset
= dns
.rrset
.from_text(name
,
231 expectedResponse
.answer
.append(rrset
)
233 for method
in ("sendUDPQuery", "sendTCPQuery"):
234 sender
= getattr(self
, method
)
235 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
236 self
.assertEquals(receivedResponse
, expectedResponse
)
238 def testDomainAndQTypeReturnsNotImplemented(self
):
240 Basics: NOTIMPL for specific name and qtype
242 dnsdist is configured to reply 'not implemented' for query
243 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
244 We send a TXT query for "nameAndQtype.powerdns.com."
245 and check that the response is 'not implemented'.
247 name
= 'nameAndQtype.tests.powerdns.com.'
248 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
249 query
.flags
&= ~dns
.flags
.RD
250 expectedResponse
= dns
.message
.make_response(query
)
251 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
253 for method
in ("sendUDPQuery", "sendTCPQuery"):
254 sender
= getattr(self
, method
)
255 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
256 self
.assertEquals(receivedResponse
, expectedResponse
)
258 def testDomainWithoutQTypeIsNotAffected(self
):
260 Basics: NOTIMPL qtype canary
262 dnsdist is configured to reply 'not implemented' for query
263 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
264 We send a A query for "nameAndQtype.tests.powerdns.com."
265 and check that the response is OK.
267 name
= 'nameAndQtype.tests.powerdns.com.'
268 query
= dns
.message
.make_query(name
, 'A', 'IN')
269 response
= dns
.message
.make_response(query
)
270 rrset
= dns
.rrset
.from_text(name
,
275 response
.answer
.append(rrset
)
277 for method
in ("sendUDPQuery", "sendTCPQuery"):
278 sender
= getattr(self
, method
)
279 (receivedQuery
, receivedResponse
) = sender(query
, response
)
280 self
.assertTrue(receivedQuery
)
281 self
.assertTrue(receivedResponse
)
282 receivedQuery
.id = query
.id
283 self
.assertEquals(query
, receivedQuery
)
284 self
.assertEquals(response
, receivedResponse
)
286 def testOtherDomainANDQTypeIsNotAffected(self
):
288 Basics: NOTIMPL qname canary
290 dnsdist is configured to reply 'not implemented' for query
291 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
292 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
293 and check that the response is OK.
295 name
= 'OtherNameAndQtype.tests.powerdns.com.'
296 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
297 response
= dns
.message
.make_response(query
)
298 rrset
= dns
.rrset
.from_text(name
,
302 'nothing to see here')
303 response
.answer
.append(rrset
)
305 for method
in ("sendUDPQuery", "sendTCPQuery"):
306 sender
= getattr(self
, method
)
307 (receivedQuery
, receivedResponse
) = sender(query
, response
)
308 self
.assertTrue(receivedQuery
)
309 self
.assertTrue(receivedResponse
)
310 receivedQuery
.id = query
.id
311 self
.assertEquals(query
, receivedQuery
)
312 self
.assertEquals(response
, receivedResponse
)
314 def testWrongResponse(self
):
316 Basics: Unrelated response from the backend
318 The backend send an unrelated answer over UDP, it should
319 be discarded by dnsdist. It could happen if we wrap around
320 maxOutstanding queries too quickly or have more than maxOutstanding
321 queries to a specific backend in the air over UDP,
322 but does not really make sense over TCP.
324 name
= 'query.unrelated.tests.powerdns.com.'
325 unrelatedName
= 'answer.unrelated.tests.powerdns.com.'
326 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
327 unrelatedQuery
= dns
.message
.make_query(unrelatedName
, 'TXT', 'IN')
328 unrelatedResponse
= dns
.message
.make_response(unrelatedQuery
)
329 rrset
= dns
.rrset
.from_text(unrelatedName
,
333 'nothing to see here')
334 unrelatedResponse
.answer
.append(rrset
)
336 for method
in ("sendUDPQuery", "sendTCPQuery"):
337 sender
= getattr(self
, method
)
338 (receivedQuery
, receivedResponse
) = sender(query
, unrelatedResponse
)
339 self
.assertTrue(receivedQuery
)
340 self
.assertEquals(receivedResponse
, None)
341 receivedQuery
.id = query
.id
342 self
.assertEquals(query
, receivedQuery
)
344 def testHeaderOnlyRefused(self
):
346 Basics: Header-only refused response
348 name
= 'header-only-refused-response.tests.powerdns.com.'
349 query
= dns
.message
.make_query(name
, 'A', 'IN')
350 response
= dns
.message
.make_response(query
)
351 response
.set_rcode(dns
.rcode
.REFUSED
)
352 response
.question
= []
354 for method
in ("sendUDPQuery", "sendTCPQuery"):
355 sender
= getattr(self
, method
)
356 (receivedQuery
, receivedResponse
) = sender(query
, response
)
357 self
.assertTrue(receivedQuery
)
358 receivedQuery
.id = query
.id
359 self
.assertEquals(query
, receivedQuery
)
360 self
.assertEquals(receivedResponse
, response
)
362 def testHeaderOnlyNoErrorResponse(self
):
364 Basics: Header-only NoError response should be dropped
366 name
= 'header-only-noerror-response.tests.powerdns.com.'
367 query
= dns
.message
.make_query(name
, 'A', 'IN')
368 response
= dns
.message
.make_response(query
)
369 response
.question
= []
371 for method
in ("sendUDPQuery", "sendTCPQuery"):
372 sender
= getattr(self
, method
)
373 (receivedQuery
, receivedResponse
) = sender(query
, response
)
374 self
.assertTrue(receivedQuery
)
375 receivedQuery
.id = query
.id
376 self
.assertEquals(query
, receivedQuery
)
377 self
.assertEquals(receivedResponse
, None)
379 def testHeaderOnlyNXDResponse(self
):
381 Basics: Header-only NXD response should be dropped
383 name
= 'header-only-nxd-response.tests.powerdns.com.'
384 query
= dns
.message
.make_query(name
, 'A', 'IN')
385 response
= dns
.message
.make_response(query
)
386 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
387 response
.question
= []
389 for method
in ("sendUDPQuery", "sendTCPQuery"):
390 sender
= getattr(self
, method
)
391 (receivedQuery
, receivedResponse
) = sender(query
, response
)
392 self
.assertTrue(receivedQuery
)
393 receivedQuery
.id = query
.id
394 self
.assertEquals(query
, receivedQuery
)
395 self
.assertEquals(receivedResponse
, None)
397 def testAddActionDNSName(self
):
399 Basics: test if addAction accepts a DNSName
401 name
= 'dnsname.addaction.powerdns.com.'
402 query
= dns
.message
.make_query(name
, 'A', 'IN')
403 query
.flags
&= ~dns
.flags
.RD
404 expectedResponse
= dns
.message
.make_response(query
)
405 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
407 for method
in ("sendUDPQuery", "sendTCPQuery"):
408 sender
= getattr(self
, method
)
409 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
410 self
.assertEquals(receivedResponse
, expectedResponse
)
412 def testAddActionDNSNames(self
):
414 Basics: test if addAction accepts a table of DNSNames
416 for name
in ['dnsname-table{}.addaction.powerdns.com.'.format(i
) for i
in range(1,2)]:
417 query
= dns
.message
.make_query(name
, 'A', 'IN')
418 query
.flags
&= ~dns
.flags
.RD
419 expectedResponse
= dns
.message
.make_response(query
)
420 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
422 for method
in ("sendUDPQuery", "sendTCPQuery"):
423 sender
= getattr(self
, method
)
424 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
425 self
.assertEquals(receivedResponse
, expectedResponse
)