]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
Merge pull request #7702 from rgacogne/dnsdist-static-fixes
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 class TestBasics(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 truncateTC(true)
12 addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
21 """
22
23 def testDropped(self):
24 """
25 Basics: Dropped query
26
27 Send an A query for drop.test.powerdns.com. domain,
28 which is dropped by configuration. We expect
29 no response.
30 """
31 name = 'drop.test.powerdns.com.'
32 query = dns.message.make_query(name, 'A', 'IN')
33 for method in ("sendUDPQuery", "sendTCPQuery"):
34 sender = getattr(self, method)
35 (_, receivedResponse) = sender(query, response=None, useQueue=False)
36 self.assertEquals(receivedResponse, None)
37
38 def testAWithECS(self):
39 """
40 Basics: A query with an ECS value
41 """
42 name = 'awithecs.tests.powerdns.com.'
43 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
44 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
45 response = dns.message.make_response(query)
46 rrset = dns.rrset.from_text(name,
47 60,
48 dns.rdataclass.IN,
49 dns.rdatatype.A,
50 '127.0.0.1')
51
52 response.answer.append(rrset)
53
54 for method in ("sendUDPQuery", "sendTCPQuery"):
55 sender = getattr(self, method)
56 (receivedQuery, receivedResponse) = sender(query, response)
57 receivedQuery.id = query.id
58 self.assertEquals(query, receivedQuery)
59 self.assertEquals(response, receivedResponse)
60
61 def testSimpleA(self):
62 """
63 Basics: A query without EDNS
64 """
65 name = 'simplea.tests.powerdns.com.'
66 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
67 response = dns.message.make_response(query)
68 rrset = dns.rrset.from_text(name,
69 3600,
70 dns.rdataclass.IN,
71 dns.rdatatype.A,
72 '127.0.0.1')
73 response.answer.append(rrset)
74
75 for method in ("sendUDPQuery", "sendTCPQuery"):
76 sender = getattr(self, method)
77 (receivedQuery, receivedResponse) = sender(query, response)
78 self.assertTrue(receivedQuery)
79 self.assertTrue(receivedResponse)
80 receivedQuery.id = query.id
81 self.assertEquals(query, receivedQuery)
82 self.assertEquals(response, receivedResponse)
83
84 def testAnyIsTruncated(self):
85 """
86 Basics: Truncate ANY query
87
88 dnsdist is configured to reply with TC to ANY queries,
89 send an ANY query and check the result.
90 It should be truncated over UDP, not over TCP.
91 """
92 name = 'any.tests.powerdns.com.'
93 query = dns.message.make_query(name, 'ANY', 'IN')
94 expectedResponse = dns.message.make_response(query)
95 expectedResponse.flags |= dns.flags.TC
96
97 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
98 self.assertEquals(receivedResponse, expectedResponse)
99
100 response = dns.message.make_response(query)
101 rrset = dns.rrset.from_text(name,
102 3600,
103 dns.rdataclass.IN,
104 dns.rdatatype.A,
105 '127.0.0.1')
106
107 response.answer.append(rrset)
108
109 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
110 self.assertTrue(receivedQuery)
111 self.assertTrue(receivedResponse)
112 receivedQuery.id = query.id
113 self.assertEquals(query, receivedQuery)
114 self.assertEquals(receivedResponse, response)
115
116 def testTruncateTC(self):
117 """
118 Basics: Truncate TC
119
120 dnsdist is configured to truncate TC (default),
121 we make the backend send responses
122 with TC set and additional content,
123 and check that the received response has been fixed.
124 """
125 name = 'atruncatetc.tests.powerdns.com.'
126 query = dns.message.make_query(name, 'A', 'IN')
127 response = dns.message.make_response(query)
128 rrset = dns.rrset.from_text(name,
129 3600,
130 dns.rdataclass.IN,
131 dns.rdatatype.A,
132 '127.0.0.1')
133
134 response.answer.append(rrset)
135 response.flags |= dns.flags.TC
136 expectedResponse = dns.message.make_response(query)
137 expectedResponse.flags |= dns.flags.TC
138
139 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
140 receivedQuery.id = query.id
141 self.assertEquals(query, receivedQuery)
142 self.assertEquals(expectedResponse.flags, receivedResponse.flags)
143 self.assertEquals(expectedResponse.question, receivedResponse.question)
144 self.assertFalse(response.answer == receivedResponse.answer)
145 self.assertEquals(len(receivedResponse.answer), 0)
146 self.assertEquals(len(receivedResponse.authority), 0)
147 self.assertEquals(len(receivedResponse.additional), 0)
148 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
149
150 def testTruncateTCEDNS(self):
151 """
152 Basics: Truncate TC with EDNS
153
154 dnsdist is configured to truncate TC (default),
155 we make the backend send responses
156 with TC set and additional content,
157 and check that the received response has been fixed.
158 Note that the query and initial response had EDNS,
159 so the final response should have it too.
160 """
161 name = 'atruncatetc.tests.powerdns.com.'
162 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
163 response = dns.message.make_response(query)
164 # force a different responder payload than the one in the query,
165 # so we check that we don't just mirror it
166 response.payload = 4242
167 rrset = dns.rrset.from_text(name,
168 3600,
169 dns.rdataclass.IN,
170 dns.rdatatype.A,
171 '127.0.0.1')
172
173 response.answer.append(rrset)
174 response.flags |= dns.flags.TC
175 expectedResponse = dns.message.make_response(query)
176 expectedResponse.payload = 4242
177 expectedResponse.flags |= dns.flags.TC
178
179 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
180 receivedQuery.id = query.id
181 self.assertEquals(query, receivedQuery)
182 self.assertEquals(response.flags, receivedResponse.flags)
183 self.assertEquals(response.question, receivedResponse.question)
184 self.assertFalse(response.answer == receivedResponse.answer)
185 self.assertEquals(len(receivedResponse.answer), 0)
186 self.assertEquals(len(receivedResponse.authority), 0)
187 self.assertEquals(len(receivedResponse.additional), 0)
188 print(expectedResponse)
189 print(receivedResponse)
190 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
191 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
192 self.assertEquals(receivedResponse.payload, 4242)
193
194 def testRegexReturnsRefused(self):
195 """
196 Basics: Refuse query matching regex
197
198 dnsdist is configured to reply 'refused' for query
199 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
200 We send a query for evil4242.powerdns.com
201 and check that the response is "refused".
202 """
203 name = 'evil4242.regex.tests.powerdns.com.'
204 query = dns.message.make_query(name, 'A', 'IN')
205 expectedResponse = dns.message.make_response(query)
206 expectedResponse.set_rcode(dns.rcode.REFUSED)
207
208 for method in ("sendUDPQuery", "sendTCPQuery"):
209 sender = getattr(self, method)
210 (_, receivedResponse) = sender(query, response=None, useQueue=False)
211 self.assertEquals(receivedResponse, expectedResponse)
212
213 def testQNameReturnsSpoofed(self):
214 """
215 Basics: test QNameRule and Spoof
216
217 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
218 """
219 name = 'ds9a.nl.'
220 query = dns.message.make_query(name, 'A', 'IN')
221 query.flags &= ~dns.flags.RD
222 expectedResponse = dns.message.make_response(query)
223 expectedResponse.set_rcode(dns.rcode.NOERROR)
224 rrset = dns.rrset.from_text(name,
225 3600,
226 dns.rdataclass.IN,
227 dns.rdatatype.A,
228 '1.2.3.4')
229 expectedResponse.answer.append(rrset)
230
231 for method in ("sendUDPQuery", "sendTCPQuery"):
232 sender = getattr(self, method)
233 (_, receivedResponse) = sender(query, response=None, useQueue=False)
234 self.assertEquals(receivedResponse, expectedResponse)
235
236 def testDomainAndQTypeReturnsNotImplemented(self):
237 """
238 Basics: NOTIMPL for specific name and qtype
239
240 dnsdist is configured to reply 'not implemented' for query
241 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
242 We send a TXT query for "nameAndQtype.powerdns.com."
243 and check that the response is 'not implemented'.
244 """
245 name = 'nameAndQtype.tests.powerdns.com.'
246 query = dns.message.make_query(name, 'TXT', 'IN')
247 expectedResponse = dns.message.make_response(query)
248 expectedResponse.set_rcode(dns.rcode.NOTIMP)
249
250 for method in ("sendUDPQuery", "sendTCPQuery"):
251 sender = getattr(self, method)
252 (_, receivedResponse) = sender(query, response=None, useQueue=False)
253 self.assertEquals(receivedResponse, expectedResponse)
254
255 def testDomainWithoutQTypeIsNotAffected(self):
256 """
257 Basics: NOTIMPL qtype canary
258
259 dnsdist is configured to reply 'not implemented' for query
260 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
261 We send a A query for "nameAndQtype.tests.powerdns.com."
262 and check that the response is OK.
263 """
264 name = 'nameAndQtype.tests.powerdns.com.'
265 query = dns.message.make_query(name, 'A', 'IN')
266 response = dns.message.make_response(query)
267 rrset = dns.rrset.from_text(name,
268 3600,
269 dns.rdataclass.IN,
270 dns.rdatatype.A,
271 '127.0.0.1')
272 response.answer.append(rrset)
273
274 for method in ("sendUDPQuery", "sendTCPQuery"):
275 sender = getattr(self, method)
276 (receivedQuery, receivedResponse) = sender(query, response)
277 self.assertTrue(receivedQuery)
278 self.assertTrue(receivedResponse)
279 receivedQuery.id = query.id
280 self.assertEquals(query, receivedQuery)
281 self.assertEquals(response, receivedResponse)
282
283 def testOtherDomainANDQTypeIsNotAffected(self):
284 """
285 Basics: NOTIMPL qname canary
286
287 dnsdist is configured to reply 'not implemented' for query
288 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
289 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
290 and check that the response is OK.
291 """
292 name = 'OtherNameAndQtype.tests.powerdns.com.'
293 query = dns.message.make_query(name, 'TXT', 'IN')
294 response = dns.message.make_response(query)
295 rrset = dns.rrset.from_text(name,
296 3600,
297 dns.rdataclass.IN,
298 dns.rdatatype.TXT,
299 'nothing to see here')
300 response.answer.append(rrset)
301
302 for method in ("sendUDPQuery", "sendTCPQuery"):
303 sender = getattr(self, method)
304 (receivedQuery, receivedResponse) = sender(query, response)
305 self.assertTrue(receivedQuery)
306 self.assertTrue(receivedResponse)
307 receivedQuery.id = query.id
308 self.assertEquals(query, receivedQuery)
309 self.assertEquals(response, receivedResponse)
310
311 def testWrongResponse(self):
312 """
313 Basics: Unrelated response from the backend
314
315 The backend send an unrelated answer over UDP, it should
316 be discarded by dnsdist. It could happen if we wrap around
317 maxOutstanding queries too quickly or have more than maxOutstanding
318 queries to a specific backend in the air over UDP,
319 but does not really make sense over TCP.
320 """
321 name = 'query.unrelated.tests.powerdns.com.'
322 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
323 query = dns.message.make_query(name, 'TXT', 'IN')
324 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
325 unrelatedResponse = dns.message.make_response(unrelatedQuery)
326 rrset = dns.rrset.from_text(unrelatedName,
327 3600,
328 dns.rdataclass.IN,
329 dns.rdatatype.TXT,
330 'nothing to see here')
331 unrelatedResponse.answer.append(rrset)
332
333 for method in ("sendUDPQuery", "sendTCPQuery"):
334 sender = getattr(self, method)
335 (receivedQuery, receivedResponse) = sender(query, unrelatedResponse)
336 self.assertTrue(receivedQuery)
337 self.assertEquals(receivedResponse, None)
338 receivedQuery.id = query.id
339 self.assertEquals(query, receivedQuery)
340
341 def testHeaderOnlyRefused(self):
342 """
343 Basics: Header-only refused response
344 """
345 name = 'header-only-refused-response.tests.powerdns.com.'
346 query = dns.message.make_query(name, 'A', 'IN')
347 response = dns.message.make_response(query)
348 response.set_rcode(dns.rcode.REFUSED)
349 response.question = []
350
351 for method in ("sendUDPQuery", "sendTCPQuery"):
352 sender = getattr(self, method)
353 (receivedQuery, receivedResponse) = sender(query, response)
354 self.assertTrue(receivedQuery)
355 receivedQuery.id = query.id
356 self.assertEquals(query, receivedQuery)
357 self.assertEquals(receivedResponse, response)
358
359 def testHeaderOnlyNoErrorResponse(self):
360 """
361 Basics: Header-only NoError response should be dropped
362 """
363 name = 'header-only-noerror-response.tests.powerdns.com.'
364 query = dns.message.make_query(name, 'A', 'IN')
365 response = dns.message.make_response(query)
366 response.question = []
367
368 for method in ("sendUDPQuery", "sendTCPQuery"):
369 sender = getattr(self, method)
370 (receivedQuery, receivedResponse) = sender(query, response)
371 self.assertTrue(receivedQuery)
372 receivedQuery.id = query.id
373 self.assertEquals(query, receivedQuery)
374 self.assertEquals(receivedResponse, None)
375
376 def testHeaderOnlyNXDResponse(self):
377 """
378 Basics: Header-only NXD response should be dropped
379 """
380 name = 'header-only-nxd-response.tests.powerdns.com.'
381 query = dns.message.make_query(name, 'A', 'IN')
382 response = dns.message.make_response(query)
383 response.set_rcode(dns.rcode.NXDOMAIN)
384 response.question = []
385
386 for method in ("sendUDPQuery", "sendTCPQuery"):
387 sender = getattr(self, method)
388 (receivedQuery, receivedResponse) = sender(query, response)
389 self.assertTrue(receivedQuery)
390 receivedQuery.id = query.id
391 self.assertEquals(query, receivedQuery)
392 self.assertEquals(receivedResponse, None)
393
394 def testAddActionDNSName(self):
395 """
396 Basics: test if addAction accepts a DNSName
397 """
398 name = 'dnsname.addaction.powerdns.com.'
399 query = dns.message.make_query(name, 'A', 'IN')
400 expectedResponse = dns.message.make_response(query)
401 expectedResponse.set_rcode(dns.rcode.REFUSED)
402
403 for method in ("sendUDPQuery", "sendTCPQuery"):
404 sender = getattr(self, method)
405 (_, receivedResponse) = sender(query, response=None, useQueue=False)
406 self.assertEquals(receivedResponse, expectedResponse)
407
408 def testAddActionDNSNames(self):
409 """
410 Basics: test if addAction accepts a table of DNSNames
411 """
412 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
413 query = dns.message.make_query(name, 'A', 'IN')
414 expectedResponse = dns.message.make_response(query)
415 expectedResponse.set_rcode(dns.rcode.REFUSED)
416
417 for method in ("sendUDPQuery", "sendTCPQuery"):
418 sender = getattr(self, method)
419 (_, receivedResponse) = sender(query, response=None, useQueue=False)
420 self.assertEquals(receivedResponse, expectedResponse)
421