]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class TestBasics(DNSDistTest
):
10 newServer{address="127.0.0.1:%s"}
12 addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
23 def testDropped(self
):
27 Send an A query for drop.test.powerdns.com. domain,
28 which is dropped by configuration. We expect
31 name
= 'drop.test.powerdns.com.'
32 query
= dns
.message
.make_query(name
, 'A', 'IN')
33 for method
in ("sendUDPQuery", "sendTCPQuery"):
34 sender
= getattr(self
, method
)
35 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
36 self
.assertEquals(receivedResponse
, None)
38 def testAWithECS(self
):
40 Basics: A query with an ECS value
42 name
= 'awithecs.tests.powerdns.com.'
43 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
44 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
])
45 response
= dns
.message
.make_response(query
)
46 rrset
= dns
.rrset
.from_text(name
,
52 response
.answer
.append(rrset
)
54 for method
in ("sendUDPQuery", "sendTCPQuery"):
55 sender
= getattr(self
, method
)
56 (receivedQuery
, receivedResponse
) = sender(query
, response
)
57 receivedQuery
.id = query
.id
58 self
.assertEquals(query
, receivedQuery
)
59 self
.assertEquals(response
, receivedResponse
)
61 def testSimpleA(self
):
63 Basics: A query without EDNS
65 name
= 'simplea.tests.powerdns.com.'
66 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
67 response
= dns
.message
.make_response(query
)
68 rrset
= dns
.rrset
.from_text(name
,
73 response
.answer
.append(rrset
)
75 for method
in ("sendUDPQuery", "sendTCPQuery"):
76 sender
= getattr(self
, method
)
77 (receivedQuery
, receivedResponse
) = sender(query
, response
)
78 self
.assertTrue(receivedQuery
)
79 self
.assertTrue(receivedResponse
)
80 receivedQuery
.id = query
.id
81 self
.assertEquals(query
, receivedQuery
)
82 self
.assertEquals(response
, receivedResponse
)
84 def testAnyIsTruncated(self
):
86 Basics: Truncate ANY query
88 dnsdist is configured to reply with TC to ANY queries,
89 send an ANY query and check the result.
90 It should be truncated over UDP, not over TCP.
92 name
= 'any.tests.powerdns.com.'
93 query
= dns
.message
.make_query(name
, 'ANY', 'IN')
94 expectedResponse
= dns
.message
.make_response(query
)
95 expectedResponse
.flags |
= dns
.flags
.TC
97 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
98 self
.assertEquals(receivedResponse
, expectedResponse
)
100 response
= dns
.message
.make_response(query
)
101 rrset
= dns
.rrset
.from_text(name
,
107 response
.answer
.append(rrset
)
109 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
110 self
.assertTrue(receivedQuery
)
111 self
.assertTrue(receivedResponse
)
112 receivedQuery
.id = query
.id
113 self
.assertEquals(query
, receivedQuery
)
114 self
.assertEquals(receivedResponse
, response
)
116 def testTruncateTC(self
):
120 dnsdist is configured to truncate TC (default),
121 we make the backend send responses
122 with TC set and additional content,
123 and check that the received response has been fixed.
125 name
= 'atruncatetc.tests.powerdns.com.'
126 query
= dns
.message
.make_query(name
, 'A', 'IN')
127 response
= dns
.message
.make_response(query
)
128 rrset
= dns
.rrset
.from_text(name
,
134 response
.answer
.append(rrset
)
135 response
.flags |
= dns
.flags
.TC
136 expectedResponse
= dns
.message
.make_response(query
)
137 expectedResponse
.flags |
= dns
.flags
.TC
139 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
140 receivedQuery
.id = query
.id
141 self
.assertEquals(query
, receivedQuery
)
142 self
.assertEquals(expectedResponse
.flags
, receivedResponse
.flags
)
143 self
.assertEquals(expectedResponse
.question
, receivedResponse
.question
)
144 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
145 self
.assertEquals(len(receivedResponse
.answer
), 0)
146 self
.assertEquals(len(receivedResponse
.authority
), 0)
147 self
.assertEquals(len(receivedResponse
.additional
), 0)
148 self
.checkMessageNoEDNS(expectedResponse
, receivedResponse
)
150 def testTruncateTCEDNS(self
):
152 Basics: Truncate TC with EDNS
154 dnsdist is configured to truncate TC (default),
155 we make the backend send responses
156 with TC set and additional content,
157 and check that the received response has been fixed.
158 Note that the query and initial response had EDNS,
159 so the final response should have it too.
161 name
= 'atruncatetc.tests.powerdns.com.'
162 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, want_dnssec
=True)
163 response
= dns
.message
.make_response(query
)
164 # force a different responder payload than the one in the query,
165 # so we check that we don't just mirror it
166 response
.payload
= 4242
167 rrset
= dns
.rrset
.from_text(name
,
173 response
.answer
.append(rrset
)
174 response
.flags |
= dns
.flags
.TC
175 expectedResponse
= dns
.message
.make_response(query
)
176 expectedResponse
.payload
= 4242
177 expectedResponse
.flags |
= dns
.flags
.TC
179 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
180 receivedQuery
.id = query
.id
181 self
.assertEquals(query
, receivedQuery
)
182 self
.assertEquals(response
.flags
, receivedResponse
.flags
)
183 self
.assertEquals(response
.question
, receivedResponse
.question
)
184 self
.assertFalse(response
.answer
== receivedResponse
.answer
)
185 self
.assertEquals(len(receivedResponse
.answer
), 0)
186 self
.assertEquals(len(receivedResponse
.authority
), 0)
187 self
.assertEquals(len(receivedResponse
.additional
), 0)
188 print(expectedResponse
)
189 print(receivedResponse
)
190 self
.checkMessageEDNSWithoutOptions(expectedResponse
, receivedResponse
)
191 self
.assertFalse(receivedResponse
.ednsflags
& dns
.flags
.DO
)
192 self
.assertEquals(receivedResponse
.payload
, 4242)
194 def testRegexReturnsRefused(self
):
196 Basics: Refuse query matching regex
198 dnsdist is configured to reply 'refused' for query
199 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
200 We send a query for evil4242.powerdns.com
201 and check that the response is "refused".
203 name
= 'evil4242.regex.tests.powerdns.com.'
204 query
= dns
.message
.make_query(name
, 'A', 'IN')
205 expectedResponse
= dns
.message
.make_response(query
)
206 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
208 for method
in ("sendUDPQuery", "sendTCPQuery"):
209 sender
= getattr(self
, method
)
210 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
211 self
.assertEquals(receivedResponse
, expectedResponse
)
213 def testQNameReturnsSpoofed(self
):
215 Basics: test QNameRule and Spoof
217 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
220 query
= dns
.message
.make_query(name
, 'A', 'IN')
221 query
.flags
&= ~dns
.flags
.RD
222 expectedResponse
= dns
.message
.make_response(query
)
223 expectedResponse
.set_rcode(dns
.rcode
.NOERROR
)
224 rrset
= dns
.rrset
.from_text(name
,
229 expectedResponse
.answer
.append(rrset
)
231 for method
in ("sendUDPQuery", "sendTCPQuery"):
232 sender
= getattr(self
, method
)
233 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
234 self
.assertEquals(receivedResponse
, expectedResponse
)
236 def testDomainAndQTypeReturnsNotImplemented(self
):
238 Basics: NOTIMPL for specific name and qtype
240 dnsdist is configured to reply 'not implemented' for query
241 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
242 We send a TXT query for "nameAndQtype.powerdns.com."
243 and check that the response is 'not implemented'.
245 name
= 'nameAndQtype.tests.powerdns.com.'
246 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
247 expectedResponse
= dns
.message
.make_response(query
)
248 expectedResponse
.set_rcode(dns
.rcode
.NOTIMP
)
250 for method
in ("sendUDPQuery", "sendTCPQuery"):
251 sender
= getattr(self
, method
)
252 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
253 self
.assertEquals(receivedResponse
, expectedResponse
)
255 def testDomainWithoutQTypeIsNotAffected(self
):
257 Basics: NOTIMPL qtype canary
259 dnsdist is configured to reply 'not implemented' for query
260 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
261 We send a A query for "nameAndQtype.tests.powerdns.com."
262 and check that the response is OK.
264 name
= 'nameAndQtype.tests.powerdns.com.'
265 query
= dns
.message
.make_query(name
, 'A', 'IN')
266 response
= dns
.message
.make_response(query
)
267 rrset
= dns
.rrset
.from_text(name
,
272 response
.answer
.append(rrset
)
274 for method
in ("sendUDPQuery", "sendTCPQuery"):
275 sender
= getattr(self
, method
)
276 (receivedQuery
, receivedResponse
) = sender(query
, response
)
277 self
.assertTrue(receivedQuery
)
278 self
.assertTrue(receivedResponse
)
279 receivedQuery
.id = query
.id
280 self
.assertEquals(query
, receivedQuery
)
281 self
.assertEquals(response
, receivedResponse
)
283 def testOtherDomainANDQTypeIsNotAffected(self
):
285 Basics: NOTIMPL qname canary
287 dnsdist is configured to reply 'not implemented' for query
288 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
289 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
290 and check that the response is OK.
292 name
= 'OtherNameAndQtype.tests.powerdns.com.'
293 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
294 response
= dns
.message
.make_response(query
)
295 rrset
= dns
.rrset
.from_text(name
,
299 'nothing to see here')
300 response
.answer
.append(rrset
)
302 for method
in ("sendUDPQuery", "sendTCPQuery"):
303 sender
= getattr(self
, method
)
304 (receivedQuery
, receivedResponse
) = sender(query
, response
)
305 self
.assertTrue(receivedQuery
)
306 self
.assertTrue(receivedResponse
)
307 receivedQuery
.id = query
.id
308 self
.assertEquals(query
, receivedQuery
)
309 self
.assertEquals(response
, receivedResponse
)
311 def testWrongResponse(self
):
313 Basics: Unrelated response from the backend
315 The backend send an unrelated answer over UDP, it should
316 be discarded by dnsdist. It could happen if we wrap around
317 maxOutstanding queries too quickly or have more than maxOutstanding
318 queries to a specific backend in the air over UDP,
319 but does not really make sense over TCP.
321 name
= 'query.unrelated.tests.powerdns.com.'
322 unrelatedName
= 'answer.unrelated.tests.powerdns.com.'
323 query
= dns
.message
.make_query(name
, 'TXT', 'IN')
324 unrelatedQuery
= dns
.message
.make_query(unrelatedName
, 'TXT', 'IN')
325 unrelatedResponse
= dns
.message
.make_response(unrelatedQuery
)
326 rrset
= dns
.rrset
.from_text(unrelatedName
,
330 'nothing to see here')
331 unrelatedResponse
.answer
.append(rrset
)
333 for method
in ("sendUDPQuery", "sendTCPQuery"):
334 sender
= getattr(self
, method
)
335 (receivedQuery
, receivedResponse
) = sender(query
, unrelatedResponse
)
336 self
.assertTrue(receivedQuery
)
337 self
.assertEquals(receivedResponse
, None)
338 receivedQuery
.id = query
.id
339 self
.assertEquals(query
, receivedQuery
)
341 def testHeaderOnlyRefused(self
):
343 Basics: Header-only refused response
345 name
= 'header-only-refused-response.tests.powerdns.com.'
346 query
= dns
.message
.make_query(name
, 'A', 'IN')
347 response
= dns
.message
.make_response(query
)
348 response
.set_rcode(dns
.rcode
.REFUSED
)
349 response
.question
= []
351 for method
in ("sendUDPQuery", "sendTCPQuery"):
352 sender
= getattr(self
, method
)
353 (receivedQuery
, receivedResponse
) = sender(query
, response
)
354 self
.assertTrue(receivedQuery
)
355 receivedQuery
.id = query
.id
356 self
.assertEquals(query
, receivedQuery
)
357 self
.assertEquals(receivedResponse
, response
)
359 def testHeaderOnlyNoErrorResponse(self
):
361 Basics: Header-only NoError response should be dropped
363 name
= 'header-only-noerror-response.tests.powerdns.com.'
364 query
= dns
.message
.make_query(name
, 'A', 'IN')
365 response
= dns
.message
.make_response(query
)
366 response
.question
= []
368 for method
in ("sendUDPQuery", "sendTCPQuery"):
369 sender
= getattr(self
, method
)
370 (receivedQuery
, receivedResponse
) = sender(query
, response
)
371 self
.assertTrue(receivedQuery
)
372 receivedQuery
.id = query
.id
373 self
.assertEquals(query
, receivedQuery
)
374 self
.assertEquals(receivedResponse
, None)
376 def testHeaderOnlyNXDResponse(self
):
378 Basics: Header-only NXD response should be dropped
380 name
= 'header-only-nxd-response.tests.powerdns.com.'
381 query
= dns
.message
.make_query(name
, 'A', 'IN')
382 response
= dns
.message
.make_response(query
)
383 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
384 response
.question
= []
386 for method
in ("sendUDPQuery", "sendTCPQuery"):
387 sender
= getattr(self
, method
)
388 (receivedQuery
, receivedResponse
) = sender(query
, response
)
389 self
.assertTrue(receivedQuery
)
390 receivedQuery
.id = query
.id
391 self
.assertEquals(query
, receivedQuery
)
392 self
.assertEquals(receivedResponse
, None)
394 def testAddActionDNSName(self
):
396 Basics: test if addAction accepts a DNSName
398 name
= 'dnsname.addaction.powerdns.com.'
399 query
= dns
.message
.make_query(name
, 'A', 'IN')
400 expectedResponse
= dns
.message
.make_response(query
)
401 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
403 for method
in ("sendUDPQuery", "sendTCPQuery"):
404 sender
= getattr(self
, method
)
405 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
406 self
.assertEquals(receivedResponse
, expectedResponse
)
408 def testAddActionDNSNames(self
):
410 Basics: test if addAction accepts a table of DNSNames
412 for name
in ['dnsname-table{}.addaction.powerdns.com.'.format(i
) for i
in range(1,2)]:
413 query
= dns
.message
.make_query(name
, 'A', 'IN')
414 expectedResponse
= dns
.message
.make_response(query
)
415 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
417 for method
in ("sendUDPQuery", "sendTCPQuery"):
418 sender
= getattr(self
, method
)
419 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
420 self
.assertEquals(receivedResponse
, expectedResponse
)