]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
dnsdist: Support setting the value of AA, AD and RA when spoofing
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 from dnsdisttests import DNSDistTest
5
6 class TestBasics(DNSDistTest):
7
8 _config_template = """
9 newServer{address="127.0.0.1:%s"}
10 truncateTC(true)
11 addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
12 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
13 mySMN = newSuffixMatchNode()
14 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
15 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
16 addAction(makeRule("drop.test.powerdns.com."), DropAction())
17 addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
18 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
19 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
20 """
21
22 def testDropped(self):
23 """
24 Basics: Dropped query
25
26 Send an A query for drop.test.powerdns.com. domain,
27 which is dropped by configuration. We expect
28 no response.
29 """
30 name = 'drop.test.powerdns.com.'
31 query = dns.message.make_query(name, 'A', 'IN')
32 for method in ("sendUDPQuery", "sendTCPQuery"):
33 sender = getattr(self, method)
34 (_, receivedResponse) = sender(query, response=None, useQueue=False)
35 self.assertEquals(receivedResponse, None)
36
37 def testAWithECS(self):
38 """
39 Basics: A query with an ECS value
40 """
41 name = 'awithecs.tests.powerdns.com.'
42 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
43 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
44 response = dns.message.make_response(query)
45 rrset = dns.rrset.from_text(name,
46 60,
47 dns.rdataclass.IN,
48 dns.rdatatype.A,
49 '127.0.0.1')
50
51 response.answer.append(rrset)
52
53 for method in ("sendUDPQuery", "sendTCPQuery"):
54 sender = getattr(self, method)
55 (receivedQuery, receivedResponse) = sender(query, response)
56 receivedQuery.id = query.id
57 self.assertEquals(query, receivedQuery)
58 self.assertEquals(response, receivedResponse)
59
60 def testSimpleA(self):
61 """
62 Basics: A query without EDNS
63 """
64 name = 'simplea.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
66 response = dns.message.make_response(query)
67 rrset = dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1')
72 response.answer.append(rrset)
73
74 for method in ("sendUDPQuery", "sendTCPQuery"):
75 sender = getattr(self, method)
76 (receivedQuery, receivedResponse) = sender(query, response)
77 self.assertTrue(receivedQuery)
78 self.assertTrue(receivedResponse)
79 receivedQuery.id = query.id
80 self.assertEquals(query, receivedQuery)
81 self.assertEquals(response, receivedResponse)
82
83 def testAnyIsTruncated(self):
84 """
85 Basics: Truncate ANY query
86
87 dnsdist is configured to reply with TC to ANY queries,
88 send an ANY query and check the result.
89 It should be truncated over UDP, not over TCP.
90 """
91 name = 'any.tests.powerdns.com.'
92 query = dns.message.make_query(name, 'ANY', 'IN')
93 # dnsdist sets RA = RD for TC responses
94 query.flags &= ~dns.flags.RD
95 expectedResponse = dns.message.make_response(query)
96 expectedResponse.flags |= dns.flags.TC
97
98 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
99 self.assertEquals(receivedResponse, expectedResponse)
100
101 response = dns.message.make_response(query)
102 rrset = dns.rrset.from_text(name,
103 3600,
104 dns.rdataclass.IN,
105 dns.rdatatype.A,
106 '127.0.0.1')
107
108 response.answer.append(rrset)
109
110 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
111 self.assertTrue(receivedQuery)
112 self.assertTrue(receivedResponse)
113 receivedQuery.id = query.id
114 self.assertEquals(query, receivedQuery)
115 self.assertEquals(receivedResponse, response)
116
117 def testTruncateTC(self):
118 """
119 Basics: Truncate TC
120
121 dnsdist is configured to truncate TC (default),
122 we make the backend send responses
123 with TC set and additional content,
124 and check that the received response has been fixed.
125 """
126 name = 'atruncatetc.tests.powerdns.com.'
127 query = dns.message.make_query(name, 'A', 'IN')
128 response = dns.message.make_response(query)
129 rrset = dns.rrset.from_text(name,
130 3600,
131 dns.rdataclass.IN,
132 dns.rdatatype.A,
133 '127.0.0.1')
134
135 response.answer.append(rrset)
136 response.flags |= dns.flags.TC
137 expectedResponse = dns.message.make_response(query)
138 expectedResponse.flags |= dns.flags.TC
139
140 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
141 receivedQuery.id = query.id
142 self.assertEquals(query, receivedQuery)
143 self.assertEquals(expectedResponse.flags, receivedResponse.flags)
144 self.assertEquals(expectedResponse.question, receivedResponse.question)
145 self.assertFalse(response.answer == receivedResponse.answer)
146 self.assertEquals(len(receivedResponse.answer), 0)
147 self.assertEquals(len(receivedResponse.authority), 0)
148 self.assertEquals(len(receivedResponse.additional), 0)
149 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
150
151 def testTruncateTCEDNS(self):
152 """
153 Basics: Truncate TC with EDNS
154
155 dnsdist is configured to truncate TC (default),
156 we make the backend send responses
157 with TC set and additional content,
158 and check that the received response has been fixed.
159 Note that the query and initial response had EDNS,
160 so the final response should have it too.
161 """
162 name = 'atruncatetc.tests.powerdns.com.'
163 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
164 response = dns.message.make_response(query)
165 # force a different responder payload than the one in the query,
166 # so we check that we don't just mirror it
167 response.payload = 4242
168 rrset = dns.rrset.from_text(name,
169 3600,
170 dns.rdataclass.IN,
171 dns.rdatatype.A,
172 '127.0.0.1')
173
174 response.answer.append(rrset)
175 response.flags |= dns.flags.TC
176 expectedResponse = dns.message.make_response(query)
177 expectedResponse.payload = 4242
178 expectedResponse.flags |= dns.flags.TC
179
180 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
181 receivedQuery.id = query.id
182 self.assertEquals(query, receivedQuery)
183 self.assertEquals(response.flags, receivedResponse.flags)
184 self.assertEquals(response.question, receivedResponse.question)
185 self.assertFalse(response.answer == receivedResponse.answer)
186 self.assertEquals(len(receivedResponse.answer), 0)
187 self.assertEquals(len(receivedResponse.authority), 0)
188 self.assertEquals(len(receivedResponse.additional), 0)
189 print(expectedResponse)
190 print(receivedResponse)
191 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
192 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
193 self.assertEquals(receivedResponse.payload, 4242)
194
195 def testRegexReturnsRefused(self):
196 """
197 Basics: Refuse query matching regex
198
199 dnsdist is configured to reply 'refused' for query
200 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
201 We send a query for evil4242.powerdns.com
202 and check that the response is "refused".
203 """
204 name = 'evil4242.regex.tests.powerdns.com.'
205 query = dns.message.make_query(name, 'A', 'IN')
206 expectedResponse = dns.message.make_response(query)
207 expectedResponse.set_rcode(dns.rcode.REFUSED)
208
209 for method in ("sendUDPQuery", "sendTCPQuery"):
210 sender = getattr(self, method)
211 (_, receivedResponse) = sender(query, response=None, useQueue=False)
212 self.assertEquals(receivedResponse, expectedResponse)
213
214 def testQNameReturnsSpoofed(self):
215 """
216 Basics: test QNameRule and Spoof
217
218 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
219 """
220 name = 'ds9a.nl.'
221 query = dns.message.make_query(name, 'A', 'IN')
222 query.flags &= ~dns.flags.RD
223 expectedResponse = dns.message.make_response(query)
224 expectedResponse.set_rcode(dns.rcode.NOERROR)
225 rrset = dns.rrset.from_text(name,
226 3600,
227 dns.rdataclass.IN,
228 dns.rdatatype.A,
229 '1.2.3.4')
230 expectedResponse.answer.append(rrset)
231
232 for method in ("sendUDPQuery", "sendTCPQuery"):
233 sender = getattr(self, method)
234 (_, receivedResponse) = sender(query, response=None, useQueue=False)
235 self.assertEquals(receivedResponse, expectedResponse)
236
237 def testDomainAndQTypeReturnsNotImplemented(self):
238 """
239 Basics: NOTIMPL for specific name and qtype
240
241 dnsdist is configured to reply 'not implemented' for query
242 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
243 We send a TXT query for "nameAndQtype.powerdns.com."
244 and check that the response is 'not implemented'.
245 """
246 name = 'nameAndQtype.tests.powerdns.com.'
247 query = dns.message.make_query(name, 'TXT', 'IN')
248 expectedResponse = dns.message.make_response(query)
249 expectedResponse.set_rcode(dns.rcode.NOTIMP)
250
251 for method in ("sendUDPQuery", "sendTCPQuery"):
252 sender = getattr(self, method)
253 (_, receivedResponse) = sender(query, response=None, useQueue=False)
254 self.assertEquals(receivedResponse, expectedResponse)
255
256 def testDomainWithoutQTypeIsNotAffected(self):
257 """
258 Basics: NOTIMPL qtype canary
259
260 dnsdist is configured to reply 'not implemented' for query
261 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
262 We send a A query for "nameAndQtype.tests.powerdns.com."
263 and check that the response is OK.
264 """
265 name = 'nameAndQtype.tests.powerdns.com.'
266 query = dns.message.make_query(name, 'A', 'IN')
267 response = dns.message.make_response(query)
268 rrset = dns.rrset.from_text(name,
269 3600,
270 dns.rdataclass.IN,
271 dns.rdatatype.A,
272 '127.0.0.1')
273 response.answer.append(rrset)
274
275 for method in ("sendUDPQuery", "sendTCPQuery"):
276 sender = getattr(self, method)
277 (receivedQuery, receivedResponse) = sender(query, response)
278 self.assertTrue(receivedQuery)
279 self.assertTrue(receivedResponse)
280 receivedQuery.id = query.id
281 self.assertEquals(query, receivedQuery)
282 self.assertEquals(response, receivedResponse)
283
284 def testOtherDomainANDQTypeIsNotAffected(self):
285 """
286 Basics: NOTIMPL qname canary
287
288 dnsdist is configured to reply 'not implemented' for query
289 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
290 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
291 and check that the response is OK.
292 """
293 name = 'OtherNameAndQtype.tests.powerdns.com.'
294 query = dns.message.make_query(name, 'TXT', 'IN')
295 response = dns.message.make_response(query)
296 rrset = dns.rrset.from_text(name,
297 3600,
298 dns.rdataclass.IN,
299 dns.rdatatype.TXT,
300 'nothing to see here')
301 response.answer.append(rrset)
302
303 for method in ("sendUDPQuery", "sendTCPQuery"):
304 sender = getattr(self, method)
305 (receivedQuery, receivedResponse) = sender(query, response)
306 self.assertTrue(receivedQuery)
307 self.assertTrue(receivedResponse)
308 receivedQuery.id = query.id
309 self.assertEquals(query, receivedQuery)
310 self.assertEquals(response, receivedResponse)
311
312 def testWrongResponse(self):
313 """
314 Basics: Unrelated response from the backend
315
316 The backend send an unrelated answer over UDP, it should
317 be discarded by dnsdist. It could happen if we wrap around
318 maxOutstanding queries too quickly or have more than maxOutstanding
319 queries to a specific backend in the air over UDP,
320 but does not really make sense over TCP.
321 """
322 name = 'query.unrelated.tests.powerdns.com.'
323 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
324 query = dns.message.make_query(name, 'TXT', 'IN')
325 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
326 unrelatedResponse = dns.message.make_response(unrelatedQuery)
327 rrset = dns.rrset.from_text(unrelatedName,
328 3600,
329 dns.rdataclass.IN,
330 dns.rdatatype.TXT,
331 'nothing to see here')
332 unrelatedResponse.answer.append(rrset)
333
334 for method in ("sendUDPQuery", "sendTCPQuery"):
335 sender = getattr(self, method)
336 (receivedQuery, receivedResponse) = sender(query, unrelatedResponse)
337 self.assertTrue(receivedQuery)
338 self.assertEquals(receivedResponse, None)
339 receivedQuery.id = query.id
340 self.assertEquals(query, receivedQuery)
341
342 def testHeaderOnlyRefused(self):
343 """
344 Basics: Header-only refused response
345 """
346 name = 'header-only-refused-response.tests.powerdns.com.'
347 query = dns.message.make_query(name, 'A', 'IN')
348 response = dns.message.make_response(query)
349 response.set_rcode(dns.rcode.REFUSED)
350 response.question = []
351
352 for method in ("sendUDPQuery", "sendTCPQuery"):
353 sender = getattr(self, method)
354 (receivedQuery, receivedResponse) = sender(query, response)
355 self.assertTrue(receivedQuery)
356 receivedQuery.id = query.id
357 self.assertEquals(query, receivedQuery)
358 self.assertEquals(receivedResponse, response)
359
360 def testHeaderOnlyNoErrorResponse(self):
361 """
362 Basics: Header-only NoError response should be dropped
363 """
364 name = 'header-only-noerror-response.tests.powerdns.com.'
365 query = dns.message.make_query(name, 'A', 'IN')
366 response = dns.message.make_response(query)
367 response.question = []
368
369 for method in ("sendUDPQuery", "sendTCPQuery"):
370 sender = getattr(self, method)
371 (receivedQuery, receivedResponse) = sender(query, response)
372 self.assertTrue(receivedQuery)
373 receivedQuery.id = query.id
374 self.assertEquals(query, receivedQuery)
375 self.assertEquals(receivedResponse, None)
376
377 def testHeaderOnlyNXDResponse(self):
378 """
379 Basics: Header-only NXD response should be dropped
380 """
381 name = 'header-only-nxd-response.tests.powerdns.com.'
382 query = dns.message.make_query(name, 'A', 'IN')
383 response = dns.message.make_response(query)
384 response.set_rcode(dns.rcode.NXDOMAIN)
385 response.question = []
386
387 for method in ("sendUDPQuery", "sendTCPQuery"):
388 sender = getattr(self, method)
389 (receivedQuery, receivedResponse) = sender(query, response)
390 self.assertTrue(receivedQuery)
391 receivedQuery.id = query.id
392 self.assertEquals(query, receivedQuery)
393 self.assertEquals(receivedResponse, None)
394
395 def testAddActionDNSName(self):
396 """
397 Basics: test if addAction accepts a DNSName
398 """
399 name = 'dnsname.addaction.powerdns.com.'
400 query = dns.message.make_query(name, 'A', 'IN')
401 expectedResponse = dns.message.make_response(query)
402 expectedResponse.set_rcode(dns.rcode.REFUSED)
403
404 for method in ("sendUDPQuery", "sendTCPQuery"):
405 sender = getattr(self, method)
406 (_, receivedResponse) = sender(query, response=None, useQueue=False)
407 self.assertEquals(receivedResponse, expectedResponse)
408
409 def testAddActionDNSNames(self):
410 """
411 Basics: test if addAction accepts a table of DNSNames
412 """
413 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
414 query = dns.message.make_query(name, 'A', 'IN')
415 expectedResponse = dns.message.make_response(query)
416 expectedResponse.set_rcode(dns.rcode.REFUSED)
417
418 for method in ("sendUDPQuery", "sendTCPQuery"):
419 sender = getattr(self, method)
420 (_, receivedResponse) = sender(query, response=None, useQueue=False)
421 self.assertEquals(receivedResponse, expectedResponse)
422