]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
Merge pull request #5177 from rgacogne/outgoing-axfr-leak
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 class TestBasics(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 truncateTC(true)
12 addAnyTCRule()
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
19 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
20 block=newDNSName("powerdns.org.")
21 function blockFilter(dq)
22 if(dq.qname:isPartOf(block))
23 then
24 print("Blocking *.powerdns.org")
25 return true
26 end
27 return false
28 end
29 """
30
31 def testDropped(self):
32 """
33 Basics: Dropped query
34
35 Send an A query for drop.test.powerdns.com. domain,
36 which is dropped by configuration. We expect
37 no response.
38 """
39 name = 'drop.test.powerdns.com.'
40 query = dns.message.make_query(name, 'A', 'IN')
41 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
42 self.assertEquals(receivedResponse, None)
43
44 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
45 self.assertEquals(receivedResponse, None)
46
47 def testBlockedA(self):
48 """
49 Basics: Blocked A query
50
51 Send an A query for the powerdns.org domain,
52 which is blocked by configuration. We expect
53 no response.
54 """
55 name = 'blockeda.tests.powerdns.org.'
56 query = dns.message.make_query(name, 'A', 'IN')
57 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
58 self.assertEquals(receivedResponse, None)
59
60 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
61 self.assertEquals(receivedResponse, None)
62
63 def testAWithECS(self):
64 """
65 Basics: A query with an ECS value
66 """
67 name = 'awithecs.tests.powerdns.com.'
68 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
69 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
70 response = dns.message.make_response(query)
71 rrset = dns.rrset.from_text(name,
72 3600,
73 dns.rdataclass.IN,
74 dns.rdatatype.A,
75 '127.0.0.1')
76
77 response.answer.append(rrset)
78
79 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
80 receivedQuery.id = query.id
81 self.assertEquals(query, receivedQuery)
82 self.assertEquals(response, receivedResponse)
83
84 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
85 receivedQuery.id = query.id
86 self.assertEquals(query, receivedQuery)
87 self.assertEquals(response, receivedResponse)
88
89 def testSimpleA(self):
90 """
91 Basics: A query without EDNS
92 """
93 name = 'simplea.tests.powerdns.com.'
94 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
95 response = dns.message.make_response(query)
96 rrset = dns.rrset.from_text(name,
97 3600,
98 dns.rdataclass.IN,
99 dns.rdatatype.A,
100 '127.0.0.1')
101 response.answer.append(rrset)
102
103 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
104 self.assertTrue(receivedQuery)
105 self.assertTrue(receivedResponse)
106 receivedQuery.id = query.id
107 self.assertEquals(query, receivedQuery)
108 self.assertEquals(response, receivedResponse)
109
110 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
111 self.assertTrue(receivedQuery)
112 self.assertTrue(receivedResponse)
113 receivedQuery.id = query.id
114 self.assertEquals(query, receivedQuery)
115 self.assertEquals(response, receivedResponse)
116
117 def testAnyIsTruncated(self):
118 """
119 Basics: Truncate ANY query
120
121 dnsdist is configured to reply with TC to ANY queries,
122 send an ANY query and check the result.
123 It should be truncated over UDP, not over TCP.
124 """
125 name = 'any.tests.powerdns.com.'
126 query = dns.message.make_query(name, 'ANY', 'IN')
127 expectedResponse = dns.message.make_response(query)
128 expectedResponse.flags |= dns.flags.TC
129
130 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
131 self.assertEquals(receivedResponse, expectedResponse)
132
133 response = dns.message.make_response(query)
134 rrset = dns.rrset.from_text(name,
135 3600,
136 dns.rdataclass.IN,
137 dns.rdatatype.A,
138 '127.0.0.1')
139
140 response.answer.append(rrset)
141
142 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
143 self.assertTrue(receivedQuery)
144 self.assertTrue(receivedResponse)
145 receivedQuery.id = query.id
146 self.assertEquals(query, receivedQuery)
147 self.assertEquals(receivedResponse, response)
148
149 def testTruncateTC(self):
150 """
151 Basics: Truncate TC
152
153 dnsdist is configured to truncate TC (default),
154 we make the backend send responses
155 with TC set and additional content,
156 and check that the received response has been fixed.
157 """
158 name = 'atruncatetc.tests.powerdns.com.'
159 query = dns.message.make_query(name, 'A', 'IN')
160 response = dns.message.make_response(query)
161 rrset = dns.rrset.from_text(name,
162 3600,
163 dns.rdataclass.IN,
164 dns.rdatatype.A,
165 '127.0.0.1')
166
167 response.answer.append(rrset)
168 response.flags |= dns.flags.TC
169
170 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
171 receivedQuery.id = query.id
172 self.assertEquals(query, receivedQuery)
173 self.assertEquals(response.flags, receivedResponse.flags)
174 self.assertEquals(response.question, receivedResponse.question)
175 self.assertFalse(response.answer == receivedResponse.answer)
176 self.assertEquals(len(receivedResponse.answer), 0)
177 self.assertEquals(len(receivedResponse.authority), 0)
178 self.assertEquals(len(receivedResponse.additional), 0)
179
180 def testRegexReturnsRefused(self):
181 """
182 Basics: Refuse query matching regex
183
184 dnsdist is configured to reply 'refused' for query
185 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
186 We send a query for evil4242.powerdns.com
187 and check that the response is "refused".
188 """
189 name = 'evil4242.regex.tests.powerdns.com.'
190 query = dns.message.make_query(name, 'A', 'IN')
191 expectedResponse = dns.message.make_response(query)
192 expectedResponse.set_rcode(dns.rcode.REFUSED)
193
194 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
195 self.assertEquals(receivedResponse, expectedResponse)
196
197 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
198 self.assertEquals(receivedResponse, expectedResponse)
199
200 def testDomainAndQTypeReturnsNotImplemented(self):
201 """
202 Basics: NOTIMPL for specific name and qtype
203
204 dnsdist is configured to reply 'not implemented' for query
205 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
206 We send a TXT query for "nameAndQtype.powerdns.com."
207 and check that the response is 'not implemented'.
208 """
209 name = 'nameAndQtype.tests.powerdns.com.'
210 query = dns.message.make_query(name, 'TXT', 'IN')
211 expectedResponse = dns.message.make_response(query)
212 expectedResponse.set_rcode(dns.rcode.NOTIMP)
213
214 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
215 self.assertEquals(receivedResponse, expectedResponse)
216
217 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
218 self.assertEquals(receivedResponse, expectedResponse)
219
220 def testDomainWithoutQTypeIsNotAffected(self):
221 """
222 Basics: NOTIMPL qtype canary
223
224 dnsdist is configured to reply 'not implemented' for query
225 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
226 We send a A query for "nameAndQtype.tests.powerdns.com."
227 and check that the response is OK.
228 """
229 name = 'nameAndQtype.tests.powerdns.com.'
230 query = dns.message.make_query(name, 'A', 'IN')
231 response = dns.message.make_response(query)
232 rrset = dns.rrset.from_text(name,
233 3600,
234 dns.rdataclass.IN,
235 dns.rdatatype.A,
236 '127.0.0.1')
237 response.answer.append(rrset)
238
239 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
240 self.assertTrue(receivedQuery)
241 self.assertTrue(receivedResponse)
242 receivedQuery.id = query.id
243 self.assertEquals(query, receivedQuery)
244 self.assertEquals(response, receivedResponse)
245
246 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
247 self.assertTrue(receivedQuery)
248 self.assertTrue(receivedResponse)
249 receivedQuery.id = query.id
250 self.assertEquals(query, receivedQuery)
251 self.assertEquals(response, receivedResponse)
252
253 def testOtherDomainANDQTypeIsNotAffected(self):
254 """
255 Basics: NOTIMPL qname canary
256
257 dnsdist is configured to reply 'not implemented' for query
258 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
259 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
260 and check that the response is OK.
261 """
262 name = 'OtherNameAndQtype.tests.powerdns.com.'
263 query = dns.message.make_query(name, 'TXT', 'IN')
264 response = dns.message.make_response(query)
265 rrset = dns.rrset.from_text(name,
266 3600,
267 dns.rdataclass.IN,
268 dns.rdatatype.TXT,
269 'nothing to see here')
270 response.answer.append(rrset)
271
272 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
273 self.assertTrue(receivedQuery)
274 self.assertTrue(receivedResponse)
275 receivedQuery.id = query.id
276 self.assertEquals(query, receivedQuery)
277 self.assertEquals(response, receivedResponse)
278
279 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
280 self.assertTrue(receivedQuery)
281 self.assertTrue(receivedResponse)
282 receivedQuery.id = query.id
283 self.assertEquals(query, receivedQuery)
284 self.assertEquals(response, receivedResponse)
285
286 def testWrongResponse(self):
287 """
288 Basics: Unrelated response from the backend
289
290 The backend send an unrelated answer over UDP, it should
291 be discarded by dnsdist. It could happen if we wrap around
292 maxOutstanding queries too quickly or have more than maxOutstanding
293 queries to a specific backend in the air over UDP,
294 but does not really make sense over TCP.
295 """
296 name = 'query.unrelated.tests.powerdns.com.'
297 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
298 query = dns.message.make_query(name, 'TXT', 'IN')
299 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
300 unrelatedResponse = dns.message.make_response(unrelatedQuery)
301 rrset = dns.rrset.from_text(unrelatedName,
302 3600,
303 dns.rdataclass.IN,
304 dns.rdatatype.TXT,
305 'nothing to see here')
306 unrelatedResponse.answer.append(rrset)
307
308 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, unrelatedResponse)
309 self.assertTrue(receivedQuery)
310 self.assertEquals(receivedResponse, None)
311 receivedQuery.id = query.id
312 self.assertEquals(query, receivedQuery)
313
314 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, unrelatedResponse)
315 self.assertTrue(receivedQuery)
316 self.assertEquals(receivedResponse, None)
317 receivedQuery.id = query.id
318 self.assertEquals(query, receivedQuery)
319
320 def testHeaderOnlyRefused(self):
321 """
322 Basics: Header-only refused response
323 """
324 name = 'header-only-refused-response.tests.powerdns.com.'
325 query = dns.message.make_query(name, 'A', 'IN')
326 response = dns.message.make_response(query)
327 response.set_rcode(dns.rcode.REFUSED)
328 response.question = []
329
330 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
331 self.assertTrue(receivedQuery)
332 receivedQuery.id = query.id
333 self.assertEquals(query, receivedQuery)
334 self.assertEquals(receivedResponse, response)
335
336 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
337 self.assertTrue(receivedQuery)
338 receivedQuery.id = query.id
339 self.assertEquals(query, receivedQuery)
340 self.assertEquals(receivedResponse, response)
341
342 def testHeaderOnlyNoErrorResponse(self):
343 """
344 Basics: Header-only NoError response should be dropped
345 """
346 name = 'header-only-noerror-response.tests.powerdns.com.'
347 query = dns.message.make_query(name, 'A', 'IN')
348 response = dns.message.make_response(query)
349 response.question = []
350
351 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
352 self.assertTrue(receivedQuery)
353 receivedQuery.id = query.id
354 self.assertEquals(query, receivedQuery)
355 self.assertEquals(receivedResponse, None)
356
357 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
358 self.assertTrue(receivedQuery)
359 receivedQuery.id = query.id
360 self.assertEquals(query, receivedQuery)
361 self.assertEquals(receivedResponse, None)
362
363 def testHeaderOnlyNXDResponse(self):
364 """
365 Basics: Header-only NXD response should be dropped
366 """
367 name = 'header-only-nxd-response.tests.powerdns.com.'
368 query = dns.message.make_query(name, 'A', 'IN')
369 response = dns.message.make_response(query)
370 response.set_rcode(dns.rcode.NXDOMAIN)
371 response.question = []
372
373 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
374 self.assertTrue(receivedQuery)
375 receivedQuery.id = query.id
376 self.assertEquals(query, receivedQuery)
377 self.assertEquals(receivedResponse, None)
378
379 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
380 self.assertTrue(receivedQuery)
381 receivedQuery.id = query.id
382 self.assertEquals(query, receivedQuery)
383 self.assertEquals(receivedResponse, None)
384
385 def testAddActionDNSName(self):
386 """
387 Basics: test if addAction accepts a DNSName
388 """
389 name = 'dnsname.addaction.powerdns.com.'
390 query = dns.message.make_query(name, 'A', 'IN')
391 expectedResponse = dns.message.make_response(query)
392 expectedResponse.set_rcode(dns.rcode.REFUSED)
393
394 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
395 self.assertEquals(receivedResponse, expectedResponse)
396
397 def testAddActionDNSNames(self):
398 """
399 Basics: test if addAction accepts a table of DNSNames
400 """
401 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
402 query = dns.message.make_query(name, 'A', 'IN')
403 expectedResponse = dns.message.make_response(query)
404 expectedResponse.set_rcode(dns.rcode.REFUSED)
405
406 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
407 self.assertEquals(receivedResponse, expectedResponse)
408
409 if __name__ == '__main__':
410 unittest.main()
411 exit(0)