]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
Merge pull request #7343 from rgacogne/dnsdist-dyngroup-smt
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 class TestBasics(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 truncateTC(true)
12 addAction(AndRule{QTypeRule(dnsdist.ANY), TCPRule(false)}, TCAction())
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
21 """
22
23 def testDropped(self):
24 """
25 Basics: Dropped query
26
27 Send an A query for drop.test.powerdns.com. domain,
28 which is dropped by configuration. We expect
29 no response.
30 """
31 name = 'drop.test.powerdns.com.'
32 query = dns.message.make_query(name, 'A', 'IN')
33 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
34 self.assertEquals(receivedResponse, None)
35
36 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
37 self.assertEquals(receivedResponse, None)
38
39 def testAWithECS(self):
40 """
41 Basics: A query with an ECS value
42 """
43 name = 'awithecs.tests.powerdns.com.'
44 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
45 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
46 response = dns.message.make_response(query)
47 rrset = dns.rrset.from_text(name,
48 60,
49 dns.rdataclass.IN,
50 dns.rdatatype.A,
51 '127.0.0.1')
52
53 response.answer.append(rrset)
54
55 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
56 receivedQuery.id = query.id
57 self.assertEquals(query, receivedQuery)
58 self.assertEquals(response, receivedResponse)
59
60 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
61 receivedQuery.id = query.id
62 self.assertEquals(query, receivedQuery)
63 self.assertEquals(response, receivedResponse)
64
65 def testSimpleA(self):
66 """
67 Basics: A query without EDNS
68 """
69 name = 'simplea.tests.powerdns.com.'
70 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
71 response = dns.message.make_response(query)
72 rrset = dns.rrset.from_text(name,
73 3600,
74 dns.rdataclass.IN,
75 dns.rdatatype.A,
76 '127.0.0.1')
77 response.answer.append(rrset)
78
79 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
80 self.assertTrue(receivedQuery)
81 self.assertTrue(receivedResponse)
82 receivedQuery.id = query.id
83 self.assertEquals(query, receivedQuery)
84 self.assertEquals(response, receivedResponse)
85
86 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
87 self.assertTrue(receivedQuery)
88 self.assertTrue(receivedResponse)
89 receivedQuery.id = query.id
90 self.assertEquals(query, receivedQuery)
91 self.assertEquals(response, receivedResponse)
92
93 def testAnyIsTruncated(self):
94 """
95 Basics: Truncate ANY query
96
97 dnsdist is configured to reply with TC to ANY queries,
98 send an ANY query and check the result.
99 It should be truncated over UDP, not over TCP.
100 """
101 name = 'any.tests.powerdns.com.'
102 query = dns.message.make_query(name, 'ANY', 'IN')
103 expectedResponse = dns.message.make_response(query)
104 expectedResponse.flags |= dns.flags.TC
105
106 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
107 self.assertEquals(receivedResponse, expectedResponse)
108
109 response = dns.message.make_response(query)
110 rrset = dns.rrset.from_text(name,
111 3600,
112 dns.rdataclass.IN,
113 dns.rdatatype.A,
114 '127.0.0.1')
115
116 response.answer.append(rrset)
117
118 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
119 self.assertTrue(receivedQuery)
120 self.assertTrue(receivedResponse)
121 receivedQuery.id = query.id
122 self.assertEquals(query, receivedQuery)
123 self.assertEquals(receivedResponse, response)
124
125 def testTruncateTC(self):
126 """
127 Basics: Truncate TC
128
129 dnsdist is configured to truncate TC (default),
130 we make the backend send responses
131 with TC set and additional content,
132 and check that the received response has been fixed.
133 """
134 name = 'atruncatetc.tests.powerdns.com.'
135 query = dns.message.make_query(name, 'A', 'IN')
136 response = dns.message.make_response(query)
137 rrset = dns.rrset.from_text(name,
138 3600,
139 dns.rdataclass.IN,
140 dns.rdatatype.A,
141 '127.0.0.1')
142
143 response.answer.append(rrset)
144 response.flags |= dns.flags.TC
145 expectedResponse = dns.message.make_response(query)
146 expectedResponse.flags |= dns.flags.TC
147
148 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
149 receivedQuery.id = query.id
150 self.assertEquals(query, receivedQuery)
151 self.assertEquals(expectedResponse.flags, receivedResponse.flags)
152 self.assertEquals(expectedResponse.question, receivedResponse.question)
153 self.assertFalse(response.answer == receivedResponse.answer)
154 self.assertEquals(len(receivedResponse.answer), 0)
155 self.assertEquals(len(receivedResponse.authority), 0)
156 self.assertEquals(len(receivedResponse.additional), 0)
157 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
158
159 def testTruncateTCEDNS(self):
160 """
161 Basics: Truncate TC with EDNS
162
163 dnsdist is configured to truncate TC (default),
164 we make the backend send responses
165 with TC set and additional content,
166 and check that the received response has been fixed.
167 Note that the query and initial response had EDNS,
168 so the final response should have it too.
169 """
170 name = 'atruncatetc.tests.powerdns.com.'
171 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
172 response = dns.message.make_response(query)
173 # force a different responder payload than the one in the query,
174 # so we check that we don't just mirror it
175 response.payload = 4242
176 rrset = dns.rrset.from_text(name,
177 3600,
178 dns.rdataclass.IN,
179 dns.rdatatype.A,
180 '127.0.0.1')
181
182 response.answer.append(rrset)
183 response.flags |= dns.flags.TC
184 expectedResponse = dns.message.make_response(query)
185 expectedResponse.payload = 4242
186 expectedResponse.flags |= dns.flags.TC
187
188 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
189 receivedQuery.id = query.id
190 self.assertEquals(query, receivedQuery)
191 self.assertEquals(response.flags, receivedResponse.flags)
192 self.assertEquals(response.question, receivedResponse.question)
193 self.assertFalse(response.answer == receivedResponse.answer)
194 self.assertEquals(len(receivedResponse.answer), 0)
195 self.assertEquals(len(receivedResponse.authority), 0)
196 self.assertEquals(len(receivedResponse.additional), 0)
197 print(expectedResponse)
198 print(receivedResponse)
199 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
200 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
201 self.assertEquals(receivedResponse.payload, 4242)
202
203 def testRegexReturnsRefused(self):
204 """
205 Basics: Refuse query matching regex
206
207 dnsdist is configured to reply 'refused' for query
208 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
209 We send a query for evil4242.powerdns.com
210 and check that the response is "refused".
211 """
212 name = 'evil4242.regex.tests.powerdns.com.'
213 query = dns.message.make_query(name, 'A', 'IN')
214 expectedResponse = dns.message.make_response(query)
215 expectedResponse.set_rcode(dns.rcode.REFUSED)
216
217 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
218 self.assertEquals(receivedResponse, expectedResponse)
219
220 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
221 self.assertEquals(receivedResponse, expectedResponse)
222
223 def testQNameReturnsSpoofed(self):
224 """
225 Basics: test QNameRule and Spoof
226
227 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
228 """
229 name = 'ds9a.nl.'
230 query = dns.message.make_query(name, 'A', 'IN')
231 query.flags &= ~dns.flags.RD
232 expectedResponse = dns.message.make_response(query)
233 expectedResponse.set_rcode(dns.rcode.NOERROR)
234 rrset = dns.rrset.from_text(name,
235 3600,
236 dns.rdataclass.IN,
237 dns.rdatatype.A,
238 '1.2.3.4')
239 expectedResponse.answer.append(rrset)
240
241 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
242 self.assertEquals(receivedResponse, expectedResponse)
243
244 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
245 self.assertEquals(receivedResponse, expectedResponse)
246
247
248 def testDomainAndQTypeReturnsNotImplemented(self):
249 """
250 Basics: NOTIMPL for specific name and qtype
251
252 dnsdist is configured to reply 'not implemented' for query
253 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
254 We send a TXT query for "nameAndQtype.powerdns.com."
255 and check that the response is 'not implemented'.
256 """
257 name = 'nameAndQtype.tests.powerdns.com.'
258 query = dns.message.make_query(name, 'TXT', 'IN')
259 expectedResponse = dns.message.make_response(query)
260 expectedResponse.set_rcode(dns.rcode.NOTIMP)
261
262 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
263 self.assertEquals(receivedResponse, expectedResponse)
264
265 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
266 self.assertEquals(receivedResponse, expectedResponse)
267
268 def testDomainWithoutQTypeIsNotAffected(self):
269 """
270 Basics: NOTIMPL qtype canary
271
272 dnsdist is configured to reply 'not implemented' for query
273 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
274 We send a A query for "nameAndQtype.tests.powerdns.com."
275 and check that the response is OK.
276 """
277 name = 'nameAndQtype.tests.powerdns.com.'
278 query = dns.message.make_query(name, 'A', 'IN')
279 response = dns.message.make_response(query)
280 rrset = dns.rrset.from_text(name,
281 3600,
282 dns.rdataclass.IN,
283 dns.rdatatype.A,
284 '127.0.0.1')
285 response.answer.append(rrset)
286
287 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
288 self.assertTrue(receivedQuery)
289 self.assertTrue(receivedResponse)
290 receivedQuery.id = query.id
291 self.assertEquals(query, receivedQuery)
292 self.assertEquals(response, receivedResponse)
293
294 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
295 self.assertTrue(receivedQuery)
296 self.assertTrue(receivedResponse)
297 receivedQuery.id = query.id
298 self.assertEquals(query, receivedQuery)
299 self.assertEquals(response, receivedResponse)
300
301 def testOtherDomainANDQTypeIsNotAffected(self):
302 """
303 Basics: NOTIMPL qname canary
304
305 dnsdist is configured to reply 'not implemented' for query
306 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
307 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
308 and check that the response is OK.
309 """
310 name = 'OtherNameAndQtype.tests.powerdns.com.'
311 query = dns.message.make_query(name, 'TXT', 'IN')
312 response = dns.message.make_response(query)
313 rrset = dns.rrset.from_text(name,
314 3600,
315 dns.rdataclass.IN,
316 dns.rdatatype.TXT,
317 'nothing to see here')
318 response.answer.append(rrset)
319
320 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
321 self.assertTrue(receivedQuery)
322 self.assertTrue(receivedResponse)
323 receivedQuery.id = query.id
324 self.assertEquals(query, receivedQuery)
325 self.assertEquals(response, receivedResponse)
326
327 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
328 self.assertTrue(receivedQuery)
329 self.assertTrue(receivedResponse)
330 receivedQuery.id = query.id
331 self.assertEquals(query, receivedQuery)
332 self.assertEquals(response, receivedResponse)
333
334 def testWrongResponse(self):
335 """
336 Basics: Unrelated response from the backend
337
338 The backend send an unrelated answer over UDP, it should
339 be discarded by dnsdist. It could happen if we wrap around
340 maxOutstanding queries too quickly or have more than maxOutstanding
341 queries to a specific backend in the air over UDP,
342 but does not really make sense over TCP.
343 """
344 name = 'query.unrelated.tests.powerdns.com.'
345 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
346 query = dns.message.make_query(name, 'TXT', 'IN')
347 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
348 unrelatedResponse = dns.message.make_response(unrelatedQuery)
349 rrset = dns.rrset.from_text(unrelatedName,
350 3600,
351 dns.rdataclass.IN,
352 dns.rdatatype.TXT,
353 'nothing to see here')
354 unrelatedResponse.answer.append(rrset)
355
356 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, unrelatedResponse)
357 self.assertTrue(receivedQuery)
358 self.assertEquals(receivedResponse, None)
359 receivedQuery.id = query.id
360 self.assertEquals(query, receivedQuery)
361
362 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, unrelatedResponse)
363 self.assertTrue(receivedQuery)
364 self.assertEquals(receivedResponse, None)
365 receivedQuery.id = query.id
366 self.assertEquals(query, receivedQuery)
367
368 def testHeaderOnlyRefused(self):
369 """
370 Basics: Header-only refused response
371 """
372 name = 'header-only-refused-response.tests.powerdns.com.'
373 query = dns.message.make_query(name, 'A', 'IN')
374 response = dns.message.make_response(query)
375 response.set_rcode(dns.rcode.REFUSED)
376 response.question = []
377
378 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
379 self.assertTrue(receivedQuery)
380 receivedQuery.id = query.id
381 self.assertEquals(query, receivedQuery)
382 self.assertEquals(receivedResponse, response)
383
384 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
385 self.assertTrue(receivedQuery)
386 receivedQuery.id = query.id
387 self.assertEquals(query, receivedQuery)
388 self.assertEquals(receivedResponse, response)
389
390 def testHeaderOnlyNoErrorResponse(self):
391 """
392 Basics: Header-only NoError response should be dropped
393 """
394 name = 'header-only-noerror-response.tests.powerdns.com.'
395 query = dns.message.make_query(name, 'A', 'IN')
396 response = dns.message.make_response(query)
397 response.question = []
398
399 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
400 self.assertTrue(receivedQuery)
401 receivedQuery.id = query.id
402 self.assertEquals(query, receivedQuery)
403 self.assertEquals(receivedResponse, None)
404
405 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
406 self.assertTrue(receivedQuery)
407 receivedQuery.id = query.id
408 self.assertEquals(query, receivedQuery)
409 self.assertEquals(receivedResponse, None)
410
411 def testHeaderOnlyNXDResponse(self):
412 """
413 Basics: Header-only NXD response should be dropped
414 """
415 name = 'header-only-nxd-response.tests.powerdns.com.'
416 query = dns.message.make_query(name, 'A', 'IN')
417 response = dns.message.make_response(query)
418 response.set_rcode(dns.rcode.NXDOMAIN)
419 response.question = []
420
421 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
422 self.assertTrue(receivedQuery)
423 receivedQuery.id = query.id
424 self.assertEquals(query, receivedQuery)
425 self.assertEquals(receivedResponse, None)
426
427 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
428 self.assertTrue(receivedQuery)
429 receivedQuery.id = query.id
430 self.assertEquals(query, receivedQuery)
431 self.assertEquals(receivedResponse, None)
432
433 def testAddActionDNSName(self):
434 """
435 Basics: test if addAction accepts a DNSName
436 """
437 name = 'dnsname.addaction.powerdns.com.'
438 query = dns.message.make_query(name, 'A', 'IN')
439 expectedResponse = dns.message.make_response(query)
440 expectedResponse.set_rcode(dns.rcode.REFUSED)
441
442 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
443 self.assertEquals(receivedResponse, expectedResponse)
444
445 def testAddActionDNSNames(self):
446 """
447 Basics: test if addAction accepts a table of DNSNames
448 """
449 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
450 query = dns.message.make_query(name, 'A', 'IN')
451 expectedResponse = dns.message.make_response(query)
452 expectedResponse.set_rcode(dns.rcode.REFUSED)
453
454 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
455 self.assertEquals(receivedResponse, expectedResponse)
456
457 if __name__ == '__main__':
458 unittest.main()
459 exit(0)