]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
Merge pull request #4955 from klaus3000/upgradingto40
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 class TestBasics(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 truncateTC(true)
12 addAnyTCRule()
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
21 block=newDNSName("powerdns.org.")
22 function blockFilter(dq)
23 if(dq.qname:isPartOf(block))
24 then
25 print("Blocking *.powerdns.org")
26 return true
27 end
28 return false
29 end
30 """
31
32 def testDropped(self):
33 """
34 Basics: Dropped query
35
36 Send an A query for drop.test.powerdns.com. domain,
37 which is dropped by configuration. We expect
38 no response.
39 """
40 name = 'drop.test.powerdns.com.'
41 query = dns.message.make_query(name, 'A', 'IN')
42 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
43 self.assertEquals(receivedResponse, None)
44
45 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
46 self.assertEquals(receivedResponse, None)
47
48 def testBlockedA(self):
49 """
50 Basics: Blocked A query
51
52 Send an A query for the powerdns.org domain,
53 which is blocked by configuration. We expect
54 no response.
55 """
56 name = 'blockeda.tests.powerdns.org.'
57 query = dns.message.make_query(name, 'A', 'IN')
58 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
59 self.assertEquals(receivedResponse, None)
60
61 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
62 self.assertEquals(receivedResponse, None)
63
64 def testAWithECS(self):
65 """
66 Basics: A query with an ECS value
67 """
68 name = 'awithecs.tests.powerdns.com.'
69 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
70 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
71 response = dns.message.make_response(query)
72 rrset = dns.rrset.from_text(name,
73 60,
74 dns.rdataclass.IN,
75 dns.rdatatype.A,
76 '127.0.0.1')
77
78 response.answer.append(rrset)
79
80 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
81 receivedQuery.id = query.id
82 self.assertEquals(query, receivedQuery)
83 self.assertEquals(response, receivedResponse)
84
85 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
86 receivedQuery.id = query.id
87 self.assertEquals(query, receivedQuery)
88 self.assertEquals(response, receivedResponse)
89
90 def testSimpleA(self):
91 """
92 Basics: A query without EDNS
93 """
94 name = 'simplea.tests.powerdns.com.'
95 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
96 response = dns.message.make_response(query)
97 rrset = dns.rrset.from_text(name,
98 3600,
99 dns.rdataclass.IN,
100 dns.rdatatype.A,
101 '127.0.0.1')
102 response.answer.append(rrset)
103
104 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
105 self.assertTrue(receivedQuery)
106 self.assertTrue(receivedResponse)
107 receivedQuery.id = query.id
108 self.assertEquals(query, receivedQuery)
109 self.assertEquals(response, receivedResponse)
110
111 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
112 self.assertTrue(receivedQuery)
113 self.assertTrue(receivedResponse)
114 receivedQuery.id = query.id
115 self.assertEquals(query, receivedQuery)
116 self.assertEquals(response, receivedResponse)
117
118 def testAnyIsTruncated(self):
119 """
120 Basics: Truncate ANY query
121
122 dnsdist is configured to reply with TC to ANY queries,
123 send an ANY query and check the result.
124 It should be truncated over UDP, not over TCP.
125 """
126 name = 'any.tests.powerdns.com.'
127 query = dns.message.make_query(name, 'ANY', 'IN')
128 expectedResponse = dns.message.make_response(query)
129 expectedResponse.flags |= dns.flags.TC
130
131 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
132 self.assertEquals(receivedResponse, expectedResponse)
133
134 response = dns.message.make_response(query)
135 rrset = dns.rrset.from_text(name,
136 3600,
137 dns.rdataclass.IN,
138 dns.rdatatype.A,
139 '127.0.0.1')
140
141 response.answer.append(rrset)
142
143 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
144 self.assertTrue(receivedQuery)
145 self.assertTrue(receivedResponse)
146 receivedQuery.id = query.id
147 self.assertEquals(query, receivedQuery)
148 self.assertEquals(receivedResponse, response)
149
150 def testTruncateTC(self):
151 """
152 Basics: Truncate TC
153
154 dnsdist is configured to truncate TC (default),
155 we make the backend send responses
156 with TC set and additional content,
157 and check that the received response has been fixed.
158 """
159 name = 'atruncatetc.tests.powerdns.com.'
160 query = dns.message.make_query(name, 'A', 'IN')
161 response = dns.message.make_response(query)
162 rrset = dns.rrset.from_text(name,
163 3600,
164 dns.rdataclass.IN,
165 dns.rdatatype.A,
166 '127.0.0.1')
167
168 response.answer.append(rrset)
169 response.flags |= dns.flags.TC
170
171 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
172 receivedQuery.id = query.id
173 self.assertEquals(query, receivedQuery)
174 self.assertEquals(response.flags, receivedResponse.flags)
175 self.assertEquals(response.question, receivedResponse.question)
176 self.assertFalse(response.answer == receivedResponse.answer)
177 self.assertEquals(len(receivedResponse.answer), 0)
178 self.assertEquals(len(receivedResponse.authority), 0)
179 self.assertEquals(len(receivedResponse.additional), 0)
180
181 def testRegexReturnsRefused(self):
182 """
183 Basics: Refuse query matching regex
184
185 dnsdist is configured to reply 'refused' for query
186 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
187 We send a query for evil4242.powerdns.com
188 and check that the response is "refused".
189 """
190 name = 'evil4242.regex.tests.powerdns.com.'
191 query = dns.message.make_query(name, 'A', 'IN')
192 expectedResponse = dns.message.make_response(query)
193 expectedResponse.set_rcode(dns.rcode.REFUSED)
194
195 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
196 self.assertEquals(receivedResponse, expectedResponse)
197
198 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
199 self.assertEquals(receivedResponse, expectedResponse)
200
201 def testQNameReturnsSpoofed(self):
202 """
203 Basics: test QNameRule and Spoof
204
205 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
206 """
207 name = 'ds9a.nl.'
208 query = dns.message.make_query(name, 'A', 'IN')
209 query.flags &= ~dns.flags.RD
210 expectedResponse = dns.message.make_response(query)
211 expectedResponse.set_rcode(dns.rcode.NOERROR)
212 rrset = dns.rrset.from_text(name,
213 3600,
214 dns.rdataclass.IN,
215 dns.rdatatype.A,
216 '1.2.3.4')
217 expectedResponse.answer.append(rrset)
218
219 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
220 self.assertEquals(receivedResponse, expectedResponse)
221
222 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
223 self.assertEquals(receivedResponse, expectedResponse)
224
225
226 def testDomainAndQTypeReturnsNotImplemented(self):
227 """
228 Basics: NOTIMPL for specific name and qtype
229
230 dnsdist is configured to reply 'not implemented' for query
231 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
232 We send a TXT query for "nameAndQtype.powerdns.com."
233 and check that the response is 'not implemented'.
234 """
235 name = 'nameAndQtype.tests.powerdns.com.'
236 query = dns.message.make_query(name, 'TXT', 'IN')
237 expectedResponse = dns.message.make_response(query)
238 expectedResponse.set_rcode(dns.rcode.NOTIMP)
239
240 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
241 self.assertEquals(receivedResponse, expectedResponse)
242
243 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
244 self.assertEquals(receivedResponse, expectedResponse)
245
246 def testDomainWithoutQTypeIsNotAffected(self):
247 """
248 Basics: NOTIMPL qtype canary
249
250 dnsdist is configured to reply 'not implemented' for query
251 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
252 We send a A query for "nameAndQtype.tests.powerdns.com."
253 and check that the response is OK.
254 """
255 name = 'nameAndQtype.tests.powerdns.com.'
256 query = dns.message.make_query(name, 'A', 'IN')
257 response = dns.message.make_response(query)
258 rrset = dns.rrset.from_text(name,
259 3600,
260 dns.rdataclass.IN,
261 dns.rdatatype.A,
262 '127.0.0.1')
263 response.answer.append(rrset)
264
265 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
266 self.assertTrue(receivedQuery)
267 self.assertTrue(receivedResponse)
268 receivedQuery.id = query.id
269 self.assertEquals(query, receivedQuery)
270 self.assertEquals(response, receivedResponse)
271
272 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
273 self.assertTrue(receivedQuery)
274 self.assertTrue(receivedResponse)
275 receivedQuery.id = query.id
276 self.assertEquals(query, receivedQuery)
277 self.assertEquals(response, receivedResponse)
278
279 def testOtherDomainANDQTypeIsNotAffected(self):
280 """
281 Basics: NOTIMPL qname canary
282
283 dnsdist is configured to reply 'not implemented' for query
284 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
285 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
286 and check that the response is OK.
287 """
288 name = 'OtherNameAndQtype.tests.powerdns.com.'
289 query = dns.message.make_query(name, 'TXT', 'IN')
290 response = dns.message.make_response(query)
291 rrset = dns.rrset.from_text(name,
292 3600,
293 dns.rdataclass.IN,
294 dns.rdatatype.TXT,
295 'nothing to see here')
296 response.answer.append(rrset)
297
298 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
299 self.assertTrue(receivedQuery)
300 self.assertTrue(receivedResponse)
301 receivedQuery.id = query.id
302 self.assertEquals(query, receivedQuery)
303 self.assertEquals(response, receivedResponse)
304
305 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
306 self.assertTrue(receivedQuery)
307 self.assertTrue(receivedResponse)
308 receivedQuery.id = query.id
309 self.assertEquals(query, receivedQuery)
310 self.assertEquals(response, receivedResponse)
311
312 def testWrongResponse(self):
313 """
314 Basics: Unrelated response from the backend
315
316 The backend send an unrelated answer over UDP, it should
317 be discarded by dnsdist. It could happen if we wrap around
318 maxOutstanding queries too quickly or have more than maxOutstanding
319 queries to a specific backend in the air over UDP,
320 but does not really make sense over TCP.
321 """
322 name = 'query.unrelated.tests.powerdns.com.'
323 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
324 query = dns.message.make_query(name, 'TXT', 'IN')
325 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
326 unrelatedResponse = dns.message.make_response(unrelatedQuery)
327 rrset = dns.rrset.from_text(unrelatedName,
328 3600,
329 dns.rdataclass.IN,
330 dns.rdatatype.TXT,
331 'nothing to see here')
332 unrelatedResponse.answer.append(rrset)
333
334 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, unrelatedResponse)
335 self.assertTrue(receivedQuery)
336 self.assertEquals(receivedResponse, None)
337 receivedQuery.id = query.id
338 self.assertEquals(query, receivedQuery)
339
340 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, unrelatedResponse)
341 self.assertTrue(receivedQuery)
342 self.assertEquals(receivedResponse, None)
343 receivedQuery.id = query.id
344 self.assertEquals(query, receivedQuery)
345
346 def testHeaderOnlyRefused(self):
347 """
348 Basics: Header-only refused response
349 """
350 name = 'header-only-refused-response.tests.powerdns.com.'
351 query = dns.message.make_query(name, 'A', 'IN')
352 response = dns.message.make_response(query)
353 response.set_rcode(dns.rcode.REFUSED)
354 response.question = []
355
356 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
357 self.assertTrue(receivedQuery)
358 receivedQuery.id = query.id
359 self.assertEquals(query, receivedQuery)
360 self.assertEquals(receivedResponse, response)
361
362 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
363 self.assertTrue(receivedQuery)
364 receivedQuery.id = query.id
365 self.assertEquals(query, receivedQuery)
366 self.assertEquals(receivedResponse, response)
367
368 def testHeaderOnlyNoErrorResponse(self):
369 """
370 Basics: Header-only NoError response should be dropped
371 """
372 name = 'header-only-noerror-response.tests.powerdns.com.'
373 query = dns.message.make_query(name, 'A', 'IN')
374 response = dns.message.make_response(query)
375 response.question = []
376
377 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
378 self.assertTrue(receivedQuery)
379 receivedQuery.id = query.id
380 self.assertEquals(query, receivedQuery)
381 self.assertEquals(receivedResponse, None)
382
383 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
384 self.assertTrue(receivedQuery)
385 receivedQuery.id = query.id
386 self.assertEquals(query, receivedQuery)
387 self.assertEquals(receivedResponse, None)
388
389 def testHeaderOnlyNXDResponse(self):
390 """
391 Basics: Header-only NXD response should be dropped
392 """
393 name = 'header-only-nxd-response.tests.powerdns.com.'
394 query = dns.message.make_query(name, 'A', 'IN')
395 response = dns.message.make_response(query)
396 response.set_rcode(dns.rcode.NXDOMAIN)
397 response.question = []
398
399 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
400 self.assertTrue(receivedQuery)
401 receivedQuery.id = query.id
402 self.assertEquals(query, receivedQuery)
403 self.assertEquals(receivedResponse, None)
404
405 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
406 self.assertTrue(receivedQuery)
407 receivedQuery.id = query.id
408 self.assertEquals(query, receivedQuery)
409 self.assertEquals(receivedResponse, None)
410
411 def testAddActionDNSName(self):
412 """
413 Basics: test if addAction accepts a DNSName
414 """
415 name = 'dnsname.addaction.powerdns.com.'
416 query = dns.message.make_query(name, 'A', 'IN')
417 expectedResponse = dns.message.make_response(query)
418 expectedResponse.set_rcode(dns.rcode.REFUSED)
419
420 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
421 self.assertEquals(receivedResponse, expectedResponse)
422
423 def testAddActionDNSNames(self):
424 """
425 Basics: test if addAction accepts a table of DNSNames
426 """
427 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
428 query = dns.message.make_query(name, 'A', 'IN')
429 expectedResponse = dns.message.make_response(query)
430 expectedResponse.set_rcode(dns.rcode.REFUSED)
431
432 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
433 self.assertEquals(receivedResponse, expectedResponse)
434
435 if __name__ == '__main__':
436 unittest.main()
437 exit(0)