]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Basics.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
CommitLineData
ca404e94 1#!/usr/bin/env python
b1bec9f0
RG
2import dns
3import clientsubnetoption
ca404e94
RG
4from dnsdisttests import DNSDistTest
5
6class TestBasics(DNSDistTest):
7
903853f4
RG
8 _config_template = """
9 newServer{address="127.0.0.1:%s"}
10 truncateTC(true)
d3ec24f9
PD
11 addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
12 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
903853f4
RG
13 mySMN = newSuffixMatchNode()
14 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
d3ec24f9 15 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
903853f4 16 addAction(makeRule("drop.test.powerdns.com."), DropAction())
d3ec24f9
PD
17 addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
18 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
19 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
903853f4
RG
20 """
21
b63add03
RG
22 def testDropped(self):
23 """
24 Basics: Dropped query
25
26 Send an A query for drop.test.powerdns.com. domain,
27 which is dropped by configuration. We expect
28 no response.
29 """
30 name = 'drop.test.powerdns.com.'
31 query = dns.message.make_query(name, 'A', 'IN')
6ca2e796
RG
32 for method in ("sendUDPQuery", "sendTCPQuery"):
33 sender = getattr(self, method)
34 (_, receivedResponse) = sender(query, response=None, useQueue=False)
35 self.assertEquals(receivedResponse, None)
b63add03 36
ca404e94
RG
37 def testAWithECS(self):
38 """
617dfe22 39 Basics: A query with an ECS value
ca404e94
RG
40 """
41 name = 'awithecs.tests.powerdns.com.'
42 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
43 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
44 response = dns.message.make_response(query)
45 rrset = dns.rrset.from_text(name,
feb22f99 46 60,
ca404e94
RG
47 dns.rdataclass.IN,
48 dns.rdatatype.A,
49 '127.0.0.1')
50
51 response.answer.append(rrset)
52
6ca2e796
RG
53 for method in ("sendUDPQuery", "sendTCPQuery"):
54 sender = getattr(self, method)
55 (receivedQuery, receivedResponse) = sender(query, response)
56 receivedQuery.id = query.id
57 self.assertEquals(query, receivedQuery)
58 self.assertEquals(response, receivedResponse)
ca404e94
RG
59
60 def testSimpleA(self):
61 """
617dfe22 62 Basics: A query without EDNS
ca404e94
RG
63 """
64 name = 'simplea.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
66 response = dns.message.make_response(query)
e7a1029c 67 rrset = dns.rrset.from_text(name,
ca404e94
RG
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1')
72 response.answer.append(rrset)
73
6ca2e796
RG
74 for method in ("sendUDPQuery", "sendTCPQuery"):
75 sender = getattr(self, method)
76 (receivedQuery, receivedResponse) = sender(query, response)
77 self.assertTrue(receivedQuery)
78 self.assertTrue(receivedResponse)
79 receivedQuery.id = query.id
80 self.assertEquals(query, receivedQuery)
81 self.assertEquals(response, receivedResponse)
ca404e94 82
ec5f5c6b
RG
83 def testAnyIsTruncated(self):
84 """
617dfe22
RG
85 Basics: Truncate ANY query
86
ec5f5c6b
RG
87 dnsdist is configured to reply with TC to ANY queries,
88 send an ANY query and check the result.
490a29bb 89 It should be truncated over UDP, not over TCP.
ec5f5c6b
RG
90 """
91 name = 'any.tests.powerdns.com.'
92 query = dns.message.make_query(name, 'ANY', 'IN')
955b9377
RG
93 # dnsdist sets RA = RD for TC responses
94 query.flags &= ~dns.flags.RD
ec5f5c6b
RG
95 expectedResponse = dns.message.make_response(query)
96 expectedResponse.flags |= dns.flags.TC
97
0a2087eb 98 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
99 self.assertEquals(receivedResponse, expectedResponse)
100
490a29bb 101 response = dns.message.make_response(query)
e7a1029c 102 rrset = dns.rrset.from_text(name,
490a29bb
RG
103 3600,
104 dns.rdataclass.IN,
105 dns.rdatatype.A,
106 '127.0.0.1')
107
108 response.answer.append(rrset)
109
110 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
111 self.assertTrue(receivedQuery)
112 self.assertTrue(receivedResponse)
113 receivedQuery.id = query.id
490a29bb
RG
114 self.assertEquals(query, receivedQuery)
115 self.assertEquals(receivedResponse, response)
ec5f5c6b
RG
116
117 def testTruncateTC(self):
118 """
617dfe22
RG
119 Basics: Truncate TC
120
ec5f5c6b
RG
121 dnsdist is configured to truncate TC (default),
122 we make the backend send responses
123 with TC set and additional content,
124 and check that the received response has been fixed.
125 """
126 name = 'atruncatetc.tests.powerdns.com.'
127 query = dns.message.make_query(name, 'A', 'IN')
128 response = dns.message.make_response(query)
129 rrset = dns.rrset.from_text(name,
130 3600,
131 dns.rdataclass.IN,
132 dns.rdatatype.A,
133 '127.0.0.1')
134
135 response.answer.append(rrset)
136 response.flags |= dns.flags.TC
e0fd37ec
RG
137 expectedResponse = dns.message.make_response(query)
138 expectedResponse.flags |= dns.flags.TC
139
140 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
141 receivedQuery.id = query.id
142 self.assertEquals(query, receivedQuery)
143 self.assertEquals(expectedResponse.flags, receivedResponse.flags)
144 self.assertEquals(expectedResponse.question, receivedResponse.question)
145 self.assertFalse(response.answer == receivedResponse.answer)
146 self.assertEquals(len(receivedResponse.answer), 0)
147 self.assertEquals(len(receivedResponse.authority), 0)
148 self.assertEquals(len(receivedResponse.additional), 0)
149 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
150
151 def testTruncateTCEDNS(self):
152 """
153 Basics: Truncate TC with EDNS
154
155 dnsdist is configured to truncate TC (default),
156 we make the backend send responses
157 with TC set and additional content,
158 and check that the received response has been fixed.
159 Note that the query and initial response had EDNS,
160 so the final response should have it too.
161 """
162 name = 'atruncatetc.tests.powerdns.com.'
163 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
164 response = dns.message.make_response(query)
165 # force a different responder payload than the one in the query,
166 # so we check that we don't just mirror it
167 response.payload = 4242
168 rrset = dns.rrset.from_text(name,
169 3600,
170 dns.rdataclass.IN,
171 dns.rdatatype.A,
172 '127.0.0.1')
173
174 response.answer.append(rrset)
175 response.flags |= dns.flags.TC
176 expectedResponse = dns.message.make_response(query)
177 expectedResponse.payload = 4242
178 expectedResponse.flags |= dns.flags.TC
ec5f5c6b
RG
179
180 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
181 receivedQuery.id = query.id
ec5f5c6b
RG
182 self.assertEquals(query, receivedQuery)
183 self.assertEquals(response.flags, receivedResponse.flags)
184 self.assertEquals(response.question, receivedResponse.question)
185 self.assertFalse(response.answer == receivedResponse.answer)
186 self.assertEquals(len(receivedResponse.answer), 0)
187 self.assertEquals(len(receivedResponse.authority), 0)
188 self.assertEquals(len(receivedResponse.additional), 0)
e0fd37ec
RG
189 print(expectedResponse)
190 print(receivedResponse)
191 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
192 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
193 self.assertEquals(receivedResponse.payload, 4242)
ec5f5c6b
RG
194
195 def testRegexReturnsRefused(self):
196 """
617dfe22
RG
197 Basics: Refuse query matching regex
198
ec5f5c6b
RG
199 dnsdist is configured to reply 'refused' for query
200 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
201 We send a query for evil4242.powerdns.com
202 and check that the response is "refused".
203 """
204 name = 'evil4242.regex.tests.powerdns.com.'
205 query = dns.message.make_query(name, 'A', 'IN')
7af22479 206 query.flags &= ~dns.flags.RD
ec5f5c6b
RG
207 expectedResponse = dns.message.make_response(query)
208 expectedResponse.set_rcode(dns.rcode.REFUSED)
209
6ca2e796
RG
210 for method in ("sendUDPQuery", "sendTCPQuery"):
211 sender = getattr(self, method)
212 (_, receivedResponse) = sender(query, response=None, useQueue=False)
213 self.assertEquals(receivedResponse, expectedResponse)
ec5f5c6b 214
feb22f99 215 def testQNameReturnsSpoofed(self):
216 """
217 Basics: test QNameRule and Spoof
218
219 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
220 """
221 name = 'ds9a.nl.'
222 query = dns.message.make_query(name, 'A', 'IN')
223 query.flags &= ~dns.flags.RD
224 expectedResponse = dns.message.make_response(query)
225 expectedResponse.set_rcode(dns.rcode.NOERROR)
226 rrset = dns.rrset.from_text(name,
227 3600,
228 dns.rdataclass.IN,
229 dns.rdatatype.A,
230 '1.2.3.4')
231 expectedResponse.answer.append(rrset)
232
6ca2e796
RG
233 for method in ("sendUDPQuery", "sendTCPQuery"):
234 sender = getattr(self, method)
235 (_, receivedResponse) = sender(query, response=None, useQueue=False)
236 self.assertEquals(receivedResponse, expectedResponse)
feb22f99 237
ec5f5c6b
RG
238 def testDomainAndQTypeReturnsNotImplemented(self):
239 """
617dfe22
RG
240 Basics: NOTIMPL for specific name and qtype
241
ec5f5c6b 242 dnsdist is configured to reply 'not implemented' for query
e7a1029c 243 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
244 We send a TXT query for "nameAndQtype.powerdns.com."
245 and check that the response is 'not implemented'.
246 """
247 name = 'nameAndQtype.tests.powerdns.com.'
248 query = dns.message.make_query(name, 'TXT', 'IN')
7af22479 249 query.flags &= ~dns.flags.RD
ec5f5c6b
RG
250 expectedResponse = dns.message.make_response(query)
251 expectedResponse.set_rcode(dns.rcode.NOTIMP)
252
6ca2e796
RG
253 for method in ("sendUDPQuery", "sendTCPQuery"):
254 sender = getattr(self, method)
255 (_, receivedResponse) = sender(query, response=None, useQueue=False)
256 self.assertEquals(receivedResponse, expectedResponse)
ec5f5c6b
RG
257
258 def testDomainWithoutQTypeIsNotAffected(self):
259 """
617dfe22
RG
260 Basics: NOTIMPL qtype canary
261
ec5f5c6b 262 dnsdist is configured to reply 'not implemented' for query
e7a1029c 263 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
264 We send a A query for "nameAndQtype.tests.powerdns.com."
265 and check that the response is OK.
266 """
267 name = 'nameAndQtype.tests.powerdns.com.'
268 query = dns.message.make_query(name, 'A', 'IN')
269 response = dns.message.make_response(query)
270 rrset = dns.rrset.from_text(name,
271 3600,
272 dns.rdataclass.IN,
273 dns.rdatatype.A,
274 '127.0.0.1')
275 response.answer.append(rrset)
276
6ca2e796
RG
277 for method in ("sendUDPQuery", "sendTCPQuery"):
278 sender = getattr(self, method)
279 (receivedQuery, receivedResponse) = sender(query, response)
280 self.assertTrue(receivedQuery)
281 self.assertTrue(receivedResponse)
282 receivedQuery.id = query.id
283 self.assertEquals(query, receivedQuery)
284 self.assertEquals(response, receivedResponse)
ec5f5c6b
RG
285
286 def testOtherDomainANDQTypeIsNotAffected(self):
287 """
617dfe22
RG
288 Basics: NOTIMPL qname canary
289
ec5f5c6b 290 dnsdist is configured to reply 'not implemented' for query
e7a1029c 291 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
292 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
293 and check that the response is OK.
294 """
295 name = 'OtherNameAndQtype.tests.powerdns.com.'
296 query = dns.message.make_query(name, 'TXT', 'IN')
297 response = dns.message.make_response(query)
298 rrset = dns.rrset.from_text(name,
299 3600,
300 dns.rdataclass.IN,
301 dns.rdatatype.TXT,
302 'nothing to see here')
303 response.answer.append(rrset)
304
6ca2e796
RG
305 for method in ("sendUDPQuery", "sendTCPQuery"):
306 sender = getattr(self, method)
307 (receivedQuery, receivedResponse) = sender(query, response)
308 self.assertTrue(receivedQuery)
309 self.assertTrue(receivedResponse)
310 receivedQuery.id = query.id
311 self.assertEquals(query, receivedQuery)
312 self.assertEquals(response, receivedResponse)
ec5f5c6b 313
7267213a
RG
314 def testWrongResponse(self):
315 """
316 Basics: Unrelated response from the backend
317
318 The backend send an unrelated answer over UDP, it should
319 be discarded by dnsdist. It could happen if we wrap around
de4896ba 320 maxOutstanding queries too quickly or have more than maxOutstanding
7267213a
RG
321 queries to a specific backend in the air over UDP,
322 but does not really make sense over TCP.
323 """
324 name = 'query.unrelated.tests.powerdns.com.'
325 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
326 query = dns.message.make_query(name, 'TXT', 'IN')
327 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
328 unrelatedResponse = dns.message.make_response(unrelatedQuery)
329 rrset = dns.rrset.from_text(unrelatedName,
330 3600,
331 dns.rdataclass.IN,
332 dns.rdatatype.TXT,
333 'nothing to see here')
334 unrelatedResponse.answer.append(rrset)
335
6ca2e796
RG
336 for method in ("sendUDPQuery", "sendTCPQuery"):
337 sender = getattr(self, method)
338 (receivedQuery, receivedResponse) = sender(query, unrelatedResponse)
339 self.assertTrue(receivedQuery)
340 self.assertEquals(receivedResponse, None)
341 receivedQuery.id = query.id
342 self.assertEquals(query, receivedQuery)
f87c4aff 343
c8c3d4e4
RG
344 def testHeaderOnlyRefused(self):
345 """
346 Basics: Header-only refused response
347 """
348 name = 'header-only-refused-response.tests.powerdns.com.'
349 query = dns.message.make_query(name, 'A', 'IN')
350 response = dns.message.make_response(query)
351 response.set_rcode(dns.rcode.REFUSED)
352 response.question = []
353
6ca2e796
RG
354 for method in ("sendUDPQuery", "sendTCPQuery"):
355 sender = getattr(self, method)
356 (receivedQuery, receivedResponse) = sender(query, response)
357 self.assertTrue(receivedQuery)
358 receivedQuery.id = query.id
359 self.assertEquals(query, receivedQuery)
360 self.assertEquals(receivedResponse, response)
c8c3d4e4
RG
361
362 def testHeaderOnlyNoErrorResponse(self):
363 """
364 Basics: Header-only NoError response should be dropped
365 """
366 name = 'header-only-noerror-response.tests.powerdns.com.'
367 query = dns.message.make_query(name, 'A', 'IN')
368 response = dns.message.make_response(query)
369 response.question = []
370
6ca2e796
RG
371 for method in ("sendUDPQuery", "sendTCPQuery"):
372 sender = getattr(self, method)
373 (receivedQuery, receivedResponse) = sender(query, response)
374 self.assertTrue(receivedQuery)
375 receivedQuery.id = query.id
376 self.assertEquals(query, receivedQuery)
377 self.assertEquals(receivedResponse, None)
c8c3d4e4
RG
378
379 def testHeaderOnlyNXDResponse(self):
380 """
381 Basics: Header-only NXD response should be dropped
382 """
383 name = 'header-only-nxd-response.tests.powerdns.com.'
384 query = dns.message.make_query(name, 'A', 'IN')
385 response = dns.message.make_response(query)
386 response.set_rcode(dns.rcode.NXDOMAIN)
387 response.question = []
388
6ca2e796
RG
389 for method in ("sendUDPQuery", "sendTCPQuery"):
390 sender = getattr(self, method)
391 (receivedQuery, receivedResponse) = sender(query, response)
392 self.assertTrue(receivedQuery)
393 receivedQuery.id = query.id
394 self.assertEquals(query, receivedQuery)
395 self.assertEquals(receivedResponse, None)
c8c3d4e4 396
f850b032
PL
397 def testAddActionDNSName(self):
398 """
399 Basics: test if addAction accepts a DNSName
400 """
401 name = 'dnsname.addaction.powerdns.com.'
402 query = dns.message.make_query(name, 'A', 'IN')
7af22479 403 query.flags &= ~dns.flags.RD
f850b032
PL
404 expectedResponse = dns.message.make_response(query)
405 expectedResponse.set_rcode(dns.rcode.REFUSED)
406
6ca2e796
RG
407 for method in ("sendUDPQuery", "sendTCPQuery"):
408 sender = getattr(self, method)
409 (_, receivedResponse) = sender(query, response=None, useQueue=False)
410 self.assertEquals(receivedResponse, expectedResponse)
f850b032
PL
411
412 def testAddActionDNSNames(self):
413 """
414 Basics: test if addAction accepts a table of DNSNames
415 """
416 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
417 query = dns.message.make_query(name, 'A', 'IN')
7af22479 418 query.flags &= ~dns.flags.RD
f850b032
PL
419 expectedResponse = dns.message.make_response(query)
420 expectedResponse.set_rcode(dns.rcode.REFUSED)
421
6ca2e796
RG
422 for method in ("sendUDPQuery", "sendTCPQuery"):
423 sender = getattr(self, method)
424 (_, receivedResponse) = sender(query, response=None, useQueue=False)
425 self.assertEquals(receivedResponse, expectedResponse)
ca404e94 426