]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
Merge pull request #8795 from omoerbeek/rec-lua-docs-policytag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 from dnsdisttests import DNSDistTest
5
6 class TestBasics(DNSDistTest):
7
8 _config_template = """
9 newServer{address="127.0.0.1:%s"}
10 truncateTC(true)
11 addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
12 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
13 mySMN = newSuffixMatchNode()
14 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
15 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
16 addAction(makeRule("drop.test.powerdns.com."), DropAction())
17 addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
18 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
19 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
20 """
21
22 def testDropped(self):
23 """
24 Basics: Dropped query
25
26 Send an A query for drop.test.powerdns.com. domain,
27 which is dropped by configuration. We expect
28 no response.
29 """
30 name = 'drop.test.powerdns.com.'
31 query = dns.message.make_query(name, 'A', 'IN')
32 for method in ("sendUDPQuery", "sendTCPQuery"):
33 sender = getattr(self, method)
34 (_, receivedResponse) = sender(query, response=None, useQueue=False)
35 self.assertEquals(receivedResponse, None)
36
37 def testAWithECS(self):
38 """
39 Basics: A query with an ECS value
40 """
41 name = 'awithecs.tests.powerdns.com.'
42 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
43 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
44 response = dns.message.make_response(query)
45 rrset = dns.rrset.from_text(name,
46 60,
47 dns.rdataclass.IN,
48 dns.rdatatype.A,
49 '127.0.0.1')
50
51 response.answer.append(rrset)
52
53 for method in ("sendUDPQuery", "sendTCPQuery"):
54 sender = getattr(self, method)
55 (receivedQuery, receivedResponse) = sender(query, response)
56 receivedQuery.id = query.id
57 self.assertEquals(query, receivedQuery)
58 self.assertEquals(response, receivedResponse)
59
60 def testSimpleA(self):
61 """
62 Basics: A query without EDNS
63 """
64 name = 'simplea.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
66 response = dns.message.make_response(query)
67 rrset = dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1')
72 response.answer.append(rrset)
73
74 for method in ("sendUDPQuery", "sendTCPQuery"):
75 sender = getattr(self, method)
76 (receivedQuery, receivedResponse) = sender(query, response)
77 self.assertTrue(receivedQuery)
78 self.assertTrue(receivedResponse)
79 receivedQuery.id = query.id
80 self.assertEquals(query, receivedQuery)
81 self.assertEquals(response, receivedResponse)
82
83 def testAnyIsTruncated(self):
84 """
85 Basics: Truncate ANY query
86
87 dnsdist is configured to reply with TC to ANY queries,
88 send an ANY query and check the result.
89 It should be truncated over UDP, not over TCP.
90 """
91 name = 'any.tests.powerdns.com.'
92 query = dns.message.make_query(name, 'ANY', 'IN')
93 # dnsdist sets RA = RD for TC responses
94 query.flags &= ~dns.flags.RD
95 expectedResponse = dns.message.make_response(query)
96 expectedResponse.flags |= dns.flags.TC
97
98 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
99 self.assertEquals(receivedResponse, expectedResponse)
100
101 response = dns.message.make_response(query)
102 rrset = dns.rrset.from_text(name,
103 3600,
104 dns.rdataclass.IN,
105 dns.rdatatype.A,
106 '127.0.0.1')
107
108 response.answer.append(rrset)
109
110 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
111 self.assertTrue(receivedQuery)
112 self.assertTrue(receivedResponse)
113 receivedQuery.id = query.id
114 self.assertEquals(query, receivedQuery)
115 self.assertEquals(receivedResponse, response)
116
117 def testTruncateTC(self):
118 """
119 Basics: Truncate TC
120
121 dnsdist is configured to truncate TC (default),
122 we make the backend send responses
123 with TC set and additional content,
124 and check that the received response has been fixed.
125 """
126 name = 'atruncatetc.tests.powerdns.com.'
127 query = dns.message.make_query(name, 'A', 'IN')
128 response = dns.message.make_response(query)
129 rrset = dns.rrset.from_text(name,
130 3600,
131 dns.rdataclass.IN,
132 dns.rdatatype.A,
133 '127.0.0.1')
134
135 response.answer.append(rrset)
136 response.flags |= dns.flags.TC
137 expectedResponse = dns.message.make_response(query)
138 expectedResponse.flags |= dns.flags.TC
139
140 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
141 receivedQuery.id = query.id
142 self.assertEquals(query, receivedQuery)
143 self.assertEquals(expectedResponse.flags, receivedResponse.flags)
144 self.assertEquals(expectedResponse.question, receivedResponse.question)
145 self.assertFalse(response.answer == receivedResponse.answer)
146 self.assertEquals(len(receivedResponse.answer), 0)
147 self.assertEquals(len(receivedResponse.authority), 0)
148 self.assertEquals(len(receivedResponse.additional), 0)
149 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
150
151 def testTruncateTCEDNS(self):
152 """
153 Basics: Truncate TC with EDNS
154
155 dnsdist is configured to truncate TC (default),
156 we make the backend send responses
157 with TC set and additional content,
158 and check that the received response has been fixed.
159 Note that the query and initial response had EDNS,
160 so the final response should have it too.
161 """
162 name = 'atruncatetc.tests.powerdns.com.'
163 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
164 response = dns.message.make_response(query)
165 # force a different responder payload than the one in the query,
166 # so we check that we don't just mirror it
167 response.payload = 4242
168 rrset = dns.rrset.from_text(name,
169 3600,
170 dns.rdataclass.IN,
171 dns.rdatatype.A,
172 '127.0.0.1')
173
174 response.answer.append(rrset)
175 response.flags |= dns.flags.TC
176 expectedResponse = dns.message.make_response(query)
177 expectedResponse.payload = 4242
178 expectedResponse.flags |= dns.flags.TC
179
180 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
181 receivedQuery.id = query.id
182 self.assertEquals(query, receivedQuery)
183 self.assertEquals(response.flags, receivedResponse.flags)
184 self.assertEquals(response.question, receivedResponse.question)
185 self.assertFalse(response.answer == receivedResponse.answer)
186 self.assertEquals(len(receivedResponse.answer), 0)
187 self.assertEquals(len(receivedResponse.authority), 0)
188 self.assertEquals(len(receivedResponse.additional), 0)
189 print(expectedResponse)
190 print(receivedResponse)
191 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
192 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
193 self.assertEquals(receivedResponse.payload, 4242)
194
195 def testRegexReturnsRefused(self):
196 """
197 Basics: Refuse query matching regex
198
199 dnsdist is configured to reply 'refused' for query
200 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
201 We send a query for evil4242.powerdns.com
202 and check that the response is "refused".
203 """
204 name = 'evil4242.regex.tests.powerdns.com.'
205 query = dns.message.make_query(name, 'A', 'IN')
206 query.flags &= ~dns.flags.RD
207 expectedResponse = dns.message.make_response(query)
208 expectedResponse.set_rcode(dns.rcode.REFUSED)
209
210 for method in ("sendUDPQuery", "sendTCPQuery"):
211 sender = getattr(self, method)
212 (_, receivedResponse) = sender(query, response=None, useQueue=False)
213 self.assertEquals(receivedResponse, expectedResponse)
214
215 def testQNameReturnsSpoofed(self):
216 """
217 Basics: test QNameRule and Spoof
218
219 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
220 """
221 name = 'ds9a.nl.'
222 query = dns.message.make_query(name, 'A', 'IN')
223 query.flags &= ~dns.flags.RD
224 expectedResponse = dns.message.make_response(query)
225 expectedResponse.set_rcode(dns.rcode.NOERROR)
226 rrset = dns.rrset.from_text(name,
227 3600,
228 dns.rdataclass.IN,
229 dns.rdatatype.A,
230 '1.2.3.4')
231 expectedResponse.answer.append(rrset)
232
233 for method in ("sendUDPQuery", "sendTCPQuery"):
234 sender = getattr(self, method)
235 (_, receivedResponse) = sender(query, response=None, useQueue=False)
236 self.assertEquals(receivedResponse, expectedResponse)
237
238 def testDomainAndQTypeReturnsNotImplemented(self):
239 """
240 Basics: NOTIMPL for specific name and qtype
241
242 dnsdist is configured to reply 'not implemented' for query
243 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
244 We send a TXT query for "nameAndQtype.powerdns.com."
245 and check that the response is 'not implemented'.
246 """
247 name = 'nameAndQtype.tests.powerdns.com.'
248 query = dns.message.make_query(name, 'TXT', 'IN')
249 query.flags &= ~dns.flags.RD
250 expectedResponse = dns.message.make_response(query)
251 expectedResponse.set_rcode(dns.rcode.NOTIMP)
252
253 for method in ("sendUDPQuery", "sendTCPQuery"):
254 sender = getattr(self, method)
255 (_, receivedResponse) = sender(query, response=None, useQueue=False)
256 self.assertEquals(receivedResponse, expectedResponse)
257
258 def testDomainWithoutQTypeIsNotAffected(self):
259 """
260 Basics: NOTIMPL qtype canary
261
262 dnsdist is configured to reply 'not implemented' for query
263 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
264 We send a A query for "nameAndQtype.tests.powerdns.com."
265 and check that the response is OK.
266 """
267 name = 'nameAndQtype.tests.powerdns.com.'
268 query = dns.message.make_query(name, 'A', 'IN')
269 response = dns.message.make_response(query)
270 rrset = dns.rrset.from_text(name,
271 3600,
272 dns.rdataclass.IN,
273 dns.rdatatype.A,
274 '127.0.0.1')
275 response.answer.append(rrset)
276
277 for method in ("sendUDPQuery", "sendTCPQuery"):
278 sender = getattr(self, method)
279 (receivedQuery, receivedResponse) = sender(query, response)
280 self.assertTrue(receivedQuery)
281 self.assertTrue(receivedResponse)
282 receivedQuery.id = query.id
283 self.assertEquals(query, receivedQuery)
284 self.assertEquals(response, receivedResponse)
285
286 def testOtherDomainANDQTypeIsNotAffected(self):
287 """
288 Basics: NOTIMPL qname canary
289
290 dnsdist is configured to reply 'not implemented' for query
291 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
292 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
293 and check that the response is OK.
294 """
295 name = 'OtherNameAndQtype.tests.powerdns.com.'
296 query = dns.message.make_query(name, 'TXT', 'IN')
297 response = dns.message.make_response(query)
298 rrset = dns.rrset.from_text(name,
299 3600,
300 dns.rdataclass.IN,
301 dns.rdatatype.TXT,
302 'nothing to see here')
303 response.answer.append(rrset)
304
305 for method in ("sendUDPQuery", "sendTCPQuery"):
306 sender = getattr(self, method)
307 (receivedQuery, receivedResponse) = sender(query, response)
308 self.assertTrue(receivedQuery)
309 self.assertTrue(receivedResponse)
310 receivedQuery.id = query.id
311 self.assertEquals(query, receivedQuery)
312 self.assertEquals(response, receivedResponse)
313
314 def testWrongResponse(self):
315 """
316 Basics: Unrelated response from the backend
317
318 The backend send an unrelated answer over UDP, it should
319 be discarded by dnsdist. It could happen if we wrap around
320 maxOutstanding queries too quickly or have more than maxOutstanding
321 queries to a specific backend in the air over UDP,
322 but does not really make sense over TCP.
323 """
324 name = 'query.unrelated.tests.powerdns.com.'
325 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
326 query = dns.message.make_query(name, 'TXT', 'IN')
327 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
328 unrelatedResponse = dns.message.make_response(unrelatedQuery)
329 rrset = dns.rrset.from_text(unrelatedName,
330 3600,
331 dns.rdataclass.IN,
332 dns.rdatatype.TXT,
333 'nothing to see here')
334 unrelatedResponse.answer.append(rrset)
335
336 for method in ("sendUDPQuery", "sendTCPQuery"):
337 sender = getattr(self, method)
338 (receivedQuery, receivedResponse) = sender(query, unrelatedResponse)
339 self.assertTrue(receivedQuery)
340 self.assertEquals(receivedResponse, None)
341 receivedQuery.id = query.id
342 self.assertEquals(query, receivedQuery)
343
344 def testHeaderOnlyRefused(self):
345 """
346 Basics: Header-only refused response
347 """
348 name = 'header-only-refused-response.tests.powerdns.com.'
349 query = dns.message.make_query(name, 'A', 'IN')
350 response = dns.message.make_response(query)
351 response.set_rcode(dns.rcode.REFUSED)
352 response.question = []
353
354 for method in ("sendUDPQuery", "sendTCPQuery"):
355 sender = getattr(self, method)
356 (receivedQuery, receivedResponse) = sender(query, response)
357 self.assertTrue(receivedQuery)
358 receivedQuery.id = query.id
359 self.assertEquals(query, receivedQuery)
360 self.assertEquals(receivedResponse, response)
361
362 def testHeaderOnlyNoErrorResponse(self):
363 """
364 Basics: Header-only NoError response should be dropped
365 """
366 name = 'header-only-noerror-response.tests.powerdns.com.'
367 query = dns.message.make_query(name, 'A', 'IN')
368 response = dns.message.make_response(query)
369 response.question = []
370
371 for method in ("sendUDPQuery", "sendTCPQuery"):
372 sender = getattr(self, method)
373 (receivedQuery, receivedResponse) = sender(query, response)
374 self.assertTrue(receivedQuery)
375 receivedQuery.id = query.id
376 self.assertEquals(query, receivedQuery)
377 self.assertEquals(receivedResponse, None)
378
379 def testHeaderOnlyNXDResponse(self):
380 """
381 Basics: Header-only NXD response should be dropped
382 """
383 name = 'header-only-nxd-response.tests.powerdns.com.'
384 query = dns.message.make_query(name, 'A', 'IN')
385 response = dns.message.make_response(query)
386 response.set_rcode(dns.rcode.NXDOMAIN)
387 response.question = []
388
389 for method in ("sendUDPQuery", "sendTCPQuery"):
390 sender = getattr(self, method)
391 (receivedQuery, receivedResponse) = sender(query, response)
392 self.assertTrue(receivedQuery)
393 receivedQuery.id = query.id
394 self.assertEquals(query, receivedQuery)
395 self.assertEquals(receivedResponse, None)
396
397 def testAddActionDNSName(self):
398 """
399 Basics: test if addAction accepts a DNSName
400 """
401 name = 'dnsname.addaction.powerdns.com.'
402 query = dns.message.make_query(name, 'A', 'IN')
403 query.flags &= ~dns.flags.RD
404 expectedResponse = dns.message.make_response(query)
405 expectedResponse.set_rcode(dns.rcode.REFUSED)
406
407 for method in ("sendUDPQuery", "sendTCPQuery"):
408 sender = getattr(self, method)
409 (_, receivedResponse) = sender(query, response=None, useQueue=False)
410 self.assertEquals(receivedResponse, expectedResponse)
411
412 def testAddActionDNSNames(self):
413 """
414 Basics: test if addAction accepts a table of DNSNames
415 """
416 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
417 query = dns.message.make_query(name, 'A', 'IN')
418 query.flags &= ~dns.flags.RD
419 expectedResponse = dns.message.make_response(query)
420 expectedResponse.set_rcode(dns.rcode.REFUSED)
421
422 for method in ("sendUDPQuery", "sendTCPQuery"):
423 sender = getattr(self, method)
424 (_, receivedResponse) = sender(query, response=None, useQueue=False)
425 self.assertEquals(receivedResponse, expectedResponse)
426