]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Basics.py
Merge pull request #7594 from rgacogne/dnsdist-set-rules
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
CommitLineData
ca404e94 1#!/usr/bin/env python
ca404e94 2import unittest
b1bec9f0
RG
3import dns
4import clientsubnetoption
ca404e94
RG
5from dnsdisttests import DNSDistTest
6
7class TestBasics(DNSDistTest):
8
903853f4
RG
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 truncateTC(true)
832c1792 12 addAction(AndRule{QTypeRule(dnsdist.ANY), TCPRule(false)}, TCAction())
55baa1f2 13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
903853f4
RG
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
55baa1f2 16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
903853f4 17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
feb22f99 18 addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
f850b032
PL
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
903853f4
RG
21 """
22
b63add03
RG
23 def testDropped(self):
24 """
25 Basics: Dropped query
26
27 Send an A query for drop.test.powerdns.com. domain,
28 which is dropped by configuration. We expect
29 no response.
30 """
31 name = 'drop.test.powerdns.com.'
32 query = dns.message.make_query(name, 'A', 'IN')
33 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
34 self.assertEquals(receivedResponse, None)
35
36 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
37 self.assertEquals(receivedResponse, None)
38
ca404e94
RG
39 def testAWithECS(self):
40 """
617dfe22 41 Basics: A query with an ECS value
ca404e94
RG
42 """
43 name = 'awithecs.tests.powerdns.com.'
44 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
45 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
46 response = dns.message.make_response(query)
47 rrset = dns.rrset.from_text(name,
feb22f99 48 60,
ca404e94
RG
49 dns.rdataclass.IN,
50 dns.rdatatype.A,
51 '127.0.0.1')
52
53 response.answer.append(rrset)
54
55 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
56 receivedQuery.id = query.id
ca404e94
RG
57 self.assertEquals(query, receivedQuery)
58 self.assertEquals(response, receivedResponse)
59
60 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
61 receivedQuery.id = query.id
ca404e94
RG
62 self.assertEquals(query, receivedQuery)
63 self.assertEquals(response, receivedResponse)
64
65 def testSimpleA(self):
66 """
617dfe22 67 Basics: A query without EDNS
ca404e94
RG
68 """
69 name = 'simplea.tests.powerdns.com.'
70 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
71 response = dns.message.make_response(query)
e7a1029c 72 rrset = dns.rrset.from_text(name,
ca404e94
RG
73 3600,
74 dns.rdataclass.IN,
75 dns.rdatatype.A,
76 '127.0.0.1')
77 response.answer.append(rrset)
78
79 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
80 self.assertTrue(receivedQuery)
81 self.assertTrue(receivedResponse)
82 receivedQuery.id = query.id
ca404e94
RG
83 self.assertEquals(query, receivedQuery)
84 self.assertEquals(response, receivedResponse)
85
86 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
87 self.assertTrue(receivedQuery)
88 self.assertTrue(receivedResponse)
89 receivedQuery.id = query.id
ca404e94
RG
90 self.assertEquals(query, receivedQuery)
91 self.assertEquals(response, receivedResponse)
92
ec5f5c6b
RG
93 def testAnyIsTruncated(self):
94 """
617dfe22
RG
95 Basics: Truncate ANY query
96
ec5f5c6b
RG
97 dnsdist is configured to reply with TC to ANY queries,
98 send an ANY query and check the result.
490a29bb 99 It should be truncated over UDP, not over TCP.
ec5f5c6b
RG
100 """
101 name = 'any.tests.powerdns.com.'
102 query = dns.message.make_query(name, 'ANY', 'IN')
103 expectedResponse = dns.message.make_response(query)
104 expectedResponse.flags |= dns.flags.TC
105
0a2087eb 106 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
107 self.assertEquals(receivedResponse, expectedResponse)
108
490a29bb 109 response = dns.message.make_response(query)
e7a1029c 110 rrset = dns.rrset.from_text(name,
490a29bb
RG
111 3600,
112 dns.rdataclass.IN,
113 dns.rdatatype.A,
114 '127.0.0.1')
115
116 response.answer.append(rrset)
117
118 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
119 self.assertTrue(receivedQuery)
120 self.assertTrue(receivedResponse)
121 receivedQuery.id = query.id
490a29bb
RG
122 self.assertEquals(query, receivedQuery)
123 self.assertEquals(receivedResponse, response)
ec5f5c6b
RG
124
125 def testTruncateTC(self):
126 """
617dfe22
RG
127 Basics: Truncate TC
128
ec5f5c6b
RG
129 dnsdist is configured to truncate TC (default),
130 we make the backend send responses
131 with TC set and additional content,
132 and check that the received response has been fixed.
133 """
134 name = 'atruncatetc.tests.powerdns.com.'
135 query = dns.message.make_query(name, 'A', 'IN')
136 response = dns.message.make_response(query)
137 rrset = dns.rrset.from_text(name,
138 3600,
139 dns.rdataclass.IN,
140 dns.rdatatype.A,
141 '127.0.0.1')
142
143 response.answer.append(rrset)
144 response.flags |= dns.flags.TC
e0fd37ec
RG
145 expectedResponse = dns.message.make_response(query)
146 expectedResponse.flags |= dns.flags.TC
147
148 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
149 receivedQuery.id = query.id
150 self.assertEquals(query, receivedQuery)
151 self.assertEquals(expectedResponse.flags, receivedResponse.flags)
152 self.assertEquals(expectedResponse.question, receivedResponse.question)
153 self.assertFalse(response.answer == receivedResponse.answer)
154 self.assertEquals(len(receivedResponse.answer), 0)
155 self.assertEquals(len(receivedResponse.authority), 0)
156 self.assertEquals(len(receivedResponse.additional), 0)
157 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
158
159 def testTruncateTCEDNS(self):
160 """
161 Basics: Truncate TC with EDNS
162
163 dnsdist is configured to truncate TC (default),
164 we make the backend send responses
165 with TC set and additional content,
166 and check that the received response has been fixed.
167 Note that the query and initial response had EDNS,
168 so the final response should have it too.
169 """
170 name = 'atruncatetc.tests.powerdns.com.'
171 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
172 response = dns.message.make_response(query)
173 # force a different responder payload than the one in the query,
174 # so we check that we don't just mirror it
175 response.payload = 4242
176 rrset = dns.rrset.from_text(name,
177 3600,
178 dns.rdataclass.IN,
179 dns.rdatatype.A,
180 '127.0.0.1')
181
182 response.answer.append(rrset)
183 response.flags |= dns.flags.TC
184 expectedResponse = dns.message.make_response(query)
185 expectedResponse.payload = 4242
186 expectedResponse.flags |= dns.flags.TC
ec5f5c6b
RG
187
188 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
189 receivedQuery.id = query.id
ec5f5c6b
RG
190 self.assertEquals(query, receivedQuery)
191 self.assertEquals(response.flags, receivedResponse.flags)
192 self.assertEquals(response.question, receivedResponse.question)
193 self.assertFalse(response.answer == receivedResponse.answer)
194 self.assertEquals(len(receivedResponse.answer), 0)
195 self.assertEquals(len(receivedResponse.authority), 0)
196 self.assertEquals(len(receivedResponse.additional), 0)
e0fd37ec
RG
197 print(expectedResponse)
198 print(receivedResponse)
199 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
200 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
201 self.assertEquals(receivedResponse.payload, 4242)
ec5f5c6b
RG
202
203 def testRegexReturnsRefused(self):
204 """
617dfe22
RG
205 Basics: Refuse query matching regex
206
ec5f5c6b
RG
207 dnsdist is configured to reply 'refused' for query
208 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
209 We send a query for evil4242.powerdns.com
210 and check that the response is "refused".
211 """
212 name = 'evil4242.regex.tests.powerdns.com.'
213 query = dns.message.make_query(name, 'A', 'IN')
214 expectedResponse = dns.message.make_response(query)
215 expectedResponse.set_rcode(dns.rcode.REFUSED)
216
0a2087eb 217 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
218 self.assertEquals(receivedResponse, expectedResponse)
219
0a2087eb 220 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
221 self.assertEquals(receivedResponse, expectedResponse)
222
feb22f99 223 def testQNameReturnsSpoofed(self):
224 """
225 Basics: test QNameRule and Spoof
226
227 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
228 """
229 name = 'ds9a.nl.'
230 query = dns.message.make_query(name, 'A', 'IN')
231 query.flags &= ~dns.flags.RD
232 expectedResponse = dns.message.make_response(query)
233 expectedResponse.set_rcode(dns.rcode.NOERROR)
234 rrset = dns.rrset.from_text(name,
235 3600,
236 dns.rdataclass.IN,
237 dns.rdatatype.A,
238 '1.2.3.4')
239 expectedResponse.answer.append(rrset)
240
241 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
242 self.assertEquals(receivedResponse, expectedResponse)
243
244 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
245 self.assertEquals(receivedResponse, expectedResponse)
246
247
ec5f5c6b
RG
248 def testDomainAndQTypeReturnsNotImplemented(self):
249 """
617dfe22
RG
250 Basics: NOTIMPL for specific name and qtype
251
ec5f5c6b 252 dnsdist is configured to reply 'not implemented' for query
e7a1029c 253 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
254 We send a TXT query for "nameAndQtype.powerdns.com."
255 and check that the response is 'not implemented'.
256 """
257 name = 'nameAndQtype.tests.powerdns.com.'
258 query = dns.message.make_query(name, 'TXT', 'IN')
259 expectedResponse = dns.message.make_response(query)
260 expectedResponse.set_rcode(dns.rcode.NOTIMP)
261
0a2087eb 262 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
263 self.assertEquals(receivedResponse, expectedResponse)
264
0a2087eb 265 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
266 self.assertEquals(receivedResponse, expectedResponse)
267
268 def testDomainWithoutQTypeIsNotAffected(self):
269 """
617dfe22
RG
270 Basics: NOTIMPL qtype canary
271
ec5f5c6b 272 dnsdist is configured to reply 'not implemented' for query
e7a1029c 273 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
274 We send a A query for "nameAndQtype.tests.powerdns.com."
275 and check that the response is OK.
276 """
277 name = 'nameAndQtype.tests.powerdns.com.'
278 query = dns.message.make_query(name, 'A', 'IN')
279 response = dns.message.make_response(query)
280 rrset = dns.rrset.from_text(name,
281 3600,
282 dns.rdataclass.IN,
283 dns.rdatatype.A,
284 '127.0.0.1')
285 response.answer.append(rrset)
286
287 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
288 self.assertTrue(receivedQuery)
289 self.assertTrue(receivedResponse)
290 receivedQuery.id = query.id
ec5f5c6b
RG
291 self.assertEquals(query, receivedQuery)
292 self.assertEquals(response, receivedResponse)
293
294 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
295 self.assertTrue(receivedQuery)
296 self.assertTrue(receivedResponse)
297 receivedQuery.id = query.id
ec5f5c6b
RG
298 self.assertEquals(query, receivedQuery)
299 self.assertEquals(response, receivedResponse)
300
301 def testOtherDomainANDQTypeIsNotAffected(self):
302 """
617dfe22
RG
303 Basics: NOTIMPL qname canary
304
ec5f5c6b 305 dnsdist is configured to reply 'not implemented' for query
e7a1029c 306 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
307 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
308 and check that the response is OK.
309 """
310 name = 'OtherNameAndQtype.tests.powerdns.com.'
311 query = dns.message.make_query(name, 'TXT', 'IN')
312 response = dns.message.make_response(query)
313 rrset = dns.rrset.from_text(name,
314 3600,
315 dns.rdataclass.IN,
316 dns.rdatatype.TXT,
317 'nothing to see here')
318 response.answer.append(rrset)
319
320 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
321 self.assertTrue(receivedQuery)
322 self.assertTrue(receivedResponse)
323 receivedQuery.id = query.id
ec5f5c6b
RG
324 self.assertEquals(query, receivedQuery)
325 self.assertEquals(response, receivedResponse)
326
327 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
328 self.assertTrue(receivedQuery)
329 self.assertTrue(receivedResponse)
330 receivedQuery.id = query.id
ec5f5c6b
RG
331 self.assertEquals(query, receivedQuery)
332 self.assertEquals(response, receivedResponse)
333
7267213a
RG
334 def testWrongResponse(self):
335 """
336 Basics: Unrelated response from the backend
337
338 The backend send an unrelated answer over UDP, it should
339 be discarded by dnsdist. It could happen if we wrap around
de4896ba 340 maxOutstanding queries too quickly or have more than maxOutstanding
7267213a
RG
341 queries to a specific backend in the air over UDP,
342 but does not really make sense over TCP.
343 """
344 name = 'query.unrelated.tests.powerdns.com.'
345 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
346 query = dns.message.make_query(name, 'TXT', 'IN')
347 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
348 unrelatedResponse = dns.message.make_response(unrelatedQuery)
349 rrset = dns.rrset.from_text(unrelatedName,
350 3600,
351 dns.rdataclass.IN,
352 dns.rdatatype.TXT,
353 'nothing to see here')
354 unrelatedResponse.answer.append(rrset)
355
356 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, unrelatedResponse)
357 self.assertTrue(receivedQuery)
358 self.assertEquals(receivedResponse, None)
359 receivedQuery.id = query.id
360 self.assertEquals(query, receivedQuery)
361
f87c4aff
RG
362 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, unrelatedResponse)
363 self.assertTrue(receivedQuery)
364 self.assertEquals(receivedResponse, None)
365 receivedQuery.id = query.id
366 self.assertEquals(query, receivedQuery)
367
c8c3d4e4
RG
368 def testHeaderOnlyRefused(self):
369 """
370 Basics: Header-only refused response
371 """
372 name = 'header-only-refused-response.tests.powerdns.com.'
373 query = dns.message.make_query(name, 'A', 'IN')
374 response = dns.message.make_response(query)
375 response.set_rcode(dns.rcode.REFUSED)
376 response.question = []
377
378 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
379 self.assertTrue(receivedQuery)
380 receivedQuery.id = query.id
381 self.assertEquals(query, receivedQuery)
382 self.assertEquals(receivedResponse, response)
383
384 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
385 self.assertTrue(receivedQuery)
386 receivedQuery.id = query.id
387 self.assertEquals(query, receivedQuery)
388 self.assertEquals(receivedResponse, response)
389
390 def testHeaderOnlyNoErrorResponse(self):
391 """
392 Basics: Header-only NoError response should be dropped
393 """
394 name = 'header-only-noerror-response.tests.powerdns.com.'
395 query = dns.message.make_query(name, 'A', 'IN')
396 response = dns.message.make_response(query)
397 response.question = []
398
399 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
400 self.assertTrue(receivedQuery)
401 receivedQuery.id = query.id
402 self.assertEquals(query, receivedQuery)
403 self.assertEquals(receivedResponse, None)
404
405 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
406 self.assertTrue(receivedQuery)
407 receivedQuery.id = query.id
408 self.assertEquals(query, receivedQuery)
409 self.assertEquals(receivedResponse, None)
410
411 def testHeaderOnlyNXDResponse(self):
412 """
413 Basics: Header-only NXD response should be dropped
414 """
415 name = 'header-only-nxd-response.tests.powerdns.com.'
416 query = dns.message.make_query(name, 'A', 'IN')
417 response = dns.message.make_response(query)
418 response.set_rcode(dns.rcode.NXDOMAIN)
419 response.question = []
420
421 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
422 self.assertTrue(receivedQuery)
423 receivedQuery.id = query.id
424 self.assertEquals(query, receivedQuery)
425 self.assertEquals(receivedResponse, None)
426
427 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
428 self.assertTrue(receivedQuery)
429 receivedQuery.id = query.id
430 self.assertEquals(query, receivedQuery)
431 self.assertEquals(receivedResponse, None)
432
f850b032
PL
433 def testAddActionDNSName(self):
434 """
435 Basics: test if addAction accepts a DNSName
436 """
437 name = 'dnsname.addaction.powerdns.com.'
438 query = dns.message.make_query(name, 'A', 'IN')
439 expectedResponse = dns.message.make_response(query)
440 expectedResponse.set_rcode(dns.rcode.REFUSED)
441
442 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
443 self.assertEquals(receivedResponse, expectedResponse)
444
445 def testAddActionDNSNames(self):
446 """
447 Basics: test if addAction accepts a table of DNSNames
448 """
449 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
450 query = dns.message.make_query(name, 'A', 'IN')
451 expectedResponse = dns.message.make_response(query)
452 expectedResponse.set_rcode(dns.rcode.REFUSED)
453
454 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
455 self.assertEquals(receivedResponse, expectedResponse)
ca404e94
RG
456
457if __name__ == '__main__':
458 unittest.main()
459 exit(0)