]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Basics.py
dnsdist: Remove BlockFilter
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
CommitLineData
ca404e94 1#!/usr/bin/env python
ca404e94 2import unittest
b1bec9f0
RG
3import dns
4import clientsubnetoption
ca404e94
RG
5from dnsdisttests import DNSDistTest
6
7class TestBasics(DNSDistTest):
8
903853f4
RG
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 truncateTC(true)
12 addAnyTCRule()
55baa1f2 13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
903853f4
RG
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
55baa1f2 16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
903853f4 17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
feb22f99 18 addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
f850b032
PL
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
903853f4
RG
21 """
22
b63add03
RG
23 def testDropped(self):
24 """
25 Basics: Dropped query
26
27 Send an A query for drop.test.powerdns.com. domain,
28 which is dropped by configuration. We expect
29 no response.
30 """
31 name = 'drop.test.powerdns.com.'
32 query = dns.message.make_query(name, 'A', 'IN')
33 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
34 self.assertEquals(receivedResponse, None)
35
36 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
37 self.assertEquals(receivedResponse, None)
38
ca404e94
RG
39 def testAWithECS(self):
40 """
617dfe22 41 Basics: A query with an ECS value
ca404e94
RG
42 """
43 name = 'awithecs.tests.powerdns.com.'
44 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
45 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
46 response = dns.message.make_response(query)
47 rrset = dns.rrset.from_text(name,
feb22f99 48 60,
ca404e94
RG
49 dns.rdataclass.IN,
50 dns.rdatatype.A,
51 '127.0.0.1')
52
53 response.answer.append(rrset)
54
55 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
56 receivedQuery.id = query.id
ca404e94
RG
57 self.assertEquals(query, receivedQuery)
58 self.assertEquals(response, receivedResponse)
59
60 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
61 receivedQuery.id = query.id
ca404e94
RG
62 self.assertEquals(query, receivedQuery)
63 self.assertEquals(response, receivedResponse)
64
65 def testSimpleA(self):
66 """
617dfe22 67 Basics: A query without EDNS
ca404e94
RG
68 """
69 name = 'simplea.tests.powerdns.com.'
70 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
71 response = dns.message.make_response(query)
e7a1029c 72 rrset = dns.rrset.from_text(name,
ca404e94
RG
73 3600,
74 dns.rdataclass.IN,
75 dns.rdatatype.A,
76 '127.0.0.1')
77 response.answer.append(rrset)
78
79 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
80 self.assertTrue(receivedQuery)
81 self.assertTrue(receivedResponse)
82 receivedQuery.id = query.id
ca404e94
RG
83 self.assertEquals(query, receivedQuery)
84 self.assertEquals(response, receivedResponse)
85
86 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
87 self.assertTrue(receivedQuery)
88 self.assertTrue(receivedResponse)
89 receivedQuery.id = query.id
ca404e94
RG
90 self.assertEquals(query, receivedQuery)
91 self.assertEquals(response, receivedResponse)
92
ec5f5c6b
RG
93 def testAnyIsTruncated(self):
94 """
617dfe22
RG
95 Basics: Truncate ANY query
96
ec5f5c6b
RG
97 dnsdist is configured to reply with TC to ANY queries,
98 send an ANY query and check the result.
490a29bb 99 It should be truncated over UDP, not over TCP.
ec5f5c6b
RG
100 """
101 name = 'any.tests.powerdns.com.'
102 query = dns.message.make_query(name, 'ANY', 'IN')
103 expectedResponse = dns.message.make_response(query)
104 expectedResponse.flags |= dns.flags.TC
105
0a2087eb 106 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
107 self.assertEquals(receivedResponse, expectedResponse)
108
490a29bb 109 response = dns.message.make_response(query)
e7a1029c 110 rrset = dns.rrset.from_text(name,
490a29bb
RG
111 3600,
112 dns.rdataclass.IN,
113 dns.rdatatype.A,
114 '127.0.0.1')
115
116 response.answer.append(rrset)
117
118 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
119 self.assertTrue(receivedQuery)
120 self.assertTrue(receivedResponse)
121 receivedQuery.id = query.id
490a29bb
RG
122 self.assertEquals(query, receivedQuery)
123 self.assertEquals(receivedResponse, response)
ec5f5c6b
RG
124
125 def testTruncateTC(self):
126 """
617dfe22
RG
127 Basics: Truncate TC
128
ec5f5c6b
RG
129 dnsdist is configured to truncate TC (default),
130 we make the backend send responses
131 with TC set and additional content,
132 and check that the received response has been fixed.
133 """
134 name = 'atruncatetc.tests.powerdns.com.'
135 query = dns.message.make_query(name, 'A', 'IN')
136 response = dns.message.make_response(query)
137 rrset = dns.rrset.from_text(name,
138 3600,
139 dns.rdataclass.IN,
140 dns.rdatatype.A,
141 '127.0.0.1')
142
143 response.answer.append(rrset)
144 response.flags |= dns.flags.TC
145
146 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
147 receivedQuery.id = query.id
ec5f5c6b
RG
148 self.assertEquals(query, receivedQuery)
149 self.assertEquals(response.flags, receivedResponse.flags)
150 self.assertEquals(response.question, receivedResponse.question)
151 self.assertFalse(response.answer == receivedResponse.answer)
152 self.assertEquals(len(receivedResponse.answer), 0)
153 self.assertEquals(len(receivedResponse.authority), 0)
154 self.assertEquals(len(receivedResponse.additional), 0)
155
156 def testRegexReturnsRefused(self):
157 """
617dfe22
RG
158 Basics: Refuse query matching regex
159
ec5f5c6b
RG
160 dnsdist is configured to reply 'refused' for query
161 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
162 We send a query for evil4242.powerdns.com
163 and check that the response is "refused".
164 """
165 name = 'evil4242.regex.tests.powerdns.com.'
166 query = dns.message.make_query(name, 'A', 'IN')
167 expectedResponse = dns.message.make_response(query)
168 expectedResponse.set_rcode(dns.rcode.REFUSED)
169
0a2087eb 170 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
171 self.assertEquals(receivedResponse, expectedResponse)
172
0a2087eb 173 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
174 self.assertEquals(receivedResponse, expectedResponse)
175
feb22f99 176 def testQNameReturnsSpoofed(self):
177 """
178 Basics: test QNameRule and Spoof
179
180 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
181 """
182 name = 'ds9a.nl.'
183 query = dns.message.make_query(name, 'A', 'IN')
184 query.flags &= ~dns.flags.RD
185 expectedResponse = dns.message.make_response(query)
186 expectedResponse.set_rcode(dns.rcode.NOERROR)
187 rrset = dns.rrset.from_text(name,
188 3600,
189 dns.rdataclass.IN,
190 dns.rdatatype.A,
191 '1.2.3.4')
192 expectedResponse.answer.append(rrset)
193
194 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
195 self.assertEquals(receivedResponse, expectedResponse)
196
197 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
198 self.assertEquals(receivedResponse, expectedResponse)
199
200
ec5f5c6b
RG
201 def testDomainAndQTypeReturnsNotImplemented(self):
202 """
617dfe22
RG
203 Basics: NOTIMPL for specific name and qtype
204
ec5f5c6b 205 dnsdist is configured to reply 'not implemented' for query
e7a1029c 206 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
207 We send a TXT query for "nameAndQtype.powerdns.com."
208 and check that the response is 'not implemented'.
209 """
210 name = 'nameAndQtype.tests.powerdns.com.'
211 query = dns.message.make_query(name, 'TXT', 'IN')
212 expectedResponse = dns.message.make_response(query)
213 expectedResponse.set_rcode(dns.rcode.NOTIMP)
214
0a2087eb 215 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
216 self.assertEquals(receivedResponse, expectedResponse)
217
0a2087eb 218 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
219 self.assertEquals(receivedResponse, expectedResponse)
220
221 def testDomainWithoutQTypeIsNotAffected(self):
222 """
617dfe22
RG
223 Basics: NOTIMPL qtype canary
224
ec5f5c6b 225 dnsdist is configured to reply 'not implemented' for query
e7a1029c 226 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
227 We send a A query for "nameAndQtype.tests.powerdns.com."
228 and check that the response is OK.
229 """
230 name = 'nameAndQtype.tests.powerdns.com.'
231 query = dns.message.make_query(name, 'A', 'IN')
232 response = dns.message.make_response(query)
233 rrset = dns.rrset.from_text(name,
234 3600,
235 dns.rdataclass.IN,
236 dns.rdatatype.A,
237 '127.0.0.1')
238 response.answer.append(rrset)
239
240 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
241 self.assertTrue(receivedQuery)
242 self.assertTrue(receivedResponse)
243 receivedQuery.id = query.id
ec5f5c6b
RG
244 self.assertEquals(query, receivedQuery)
245 self.assertEquals(response, receivedResponse)
246
247 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
248 self.assertTrue(receivedQuery)
249 self.assertTrue(receivedResponse)
250 receivedQuery.id = query.id
ec5f5c6b
RG
251 self.assertEquals(query, receivedQuery)
252 self.assertEquals(response, receivedResponse)
253
254 def testOtherDomainANDQTypeIsNotAffected(self):
255 """
617dfe22
RG
256 Basics: NOTIMPL qname canary
257
ec5f5c6b 258 dnsdist is configured to reply 'not implemented' for query
e7a1029c 259 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
260 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
261 and check that the response is OK.
262 """
263 name = 'OtherNameAndQtype.tests.powerdns.com.'
264 query = dns.message.make_query(name, 'TXT', 'IN')
265 response = dns.message.make_response(query)
266 rrset = dns.rrset.from_text(name,
267 3600,
268 dns.rdataclass.IN,
269 dns.rdatatype.TXT,
270 'nothing to see here')
271 response.answer.append(rrset)
272
273 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
274 self.assertTrue(receivedQuery)
275 self.assertTrue(receivedResponse)
276 receivedQuery.id = query.id
ec5f5c6b
RG
277 self.assertEquals(query, receivedQuery)
278 self.assertEquals(response, receivedResponse)
279
280 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
281 self.assertTrue(receivedQuery)
282 self.assertTrue(receivedResponse)
283 receivedQuery.id = query.id
ec5f5c6b
RG
284 self.assertEquals(query, receivedQuery)
285 self.assertEquals(response, receivedResponse)
286
7267213a
RG
287 def testWrongResponse(self):
288 """
289 Basics: Unrelated response from the backend
290
291 The backend send an unrelated answer over UDP, it should
292 be discarded by dnsdist. It could happen if we wrap around
de4896ba 293 maxOutstanding queries too quickly or have more than maxOutstanding
7267213a
RG
294 queries to a specific backend in the air over UDP,
295 but does not really make sense over TCP.
296 """
297 name = 'query.unrelated.tests.powerdns.com.'
298 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
299 query = dns.message.make_query(name, 'TXT', 'IN')
300 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
301 unrelatedResponse = dns.message.make_response(unrelatedQuery)
302 rrset = dns.rrset.from_text(unrelatedName,
303 3600,
304 dns.rdataclass.IN,
305 dns.rdatatype.TXT,
306 'nothing to see here')
307 unrelatedResponse.answer.append(rrset)
308
309 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, unrelatedResponse)
310 self.assertTrue(receivedQuery)
311 self.assertEquals(receivedResponse, None)
312 receivedQuery.id = query.id
313 self.assertEquals(query, receivedQuery)
314
f87c4aff
RG
315 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, unrelatedResponse)
316 self.assertTrue(receivedQuery)
317 self.assertEquals(receivedResponse, None)
318 receivedQuery.id = query.id
319 self.assertEquals(query, receivedQuery)
320
c8c3d4e4
RG
321 def testHeaderOnlyRefused(self):
322 """
323 Basics: Header-only refused response
324 """
325 name = 'header-only-refused-response.tests.powerdns.com.'
326 query = dns.message.make_query(name, 'A', 'IN')
327 response = dns.message.make_response(query)
328 response.set_rcode(dns.rcode.REFUSED)
329 response.question = []
330
331 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
332 self.assertTrue(receivedQuery)
333 receivedQuery.id = query.id
334 self.assertEquals(query, receivedQuery)
335 self.assertEquals(receivedResponse, response)
336
337 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
338 self.assertTrue(receivedQuery)
339 receivedQuery.id = query.id
340 self.assertEquals(query, receivedQuery)
341 self.assertEquals(receivedResponse, response)
342
343 def testHeaderOnlyNoErrorResponse(self):
344 """
345 Basics: Header-only NoError response should be dropped
346 """
347 name = 'header-only-noerror-response.tests.powerdns.com.'
348 query = dns.message.make_query(name, 'A', 'IN')
349 response = dns.message.make_response(query)
350 response.question = []
351
352 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
353 self.assertTrue(receivedQuery)
354 receivedQuery.id = query.id
355 self.assertEquals(query, receivedQuery)
356 self.assertEquals(receivedResponse, None)
357
358 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
359 self.assertTrue(receivedQuery)
360 receivedQuery.id = query.id
361 self.assertEquals(query, receivedQuery)
362 self.assertEquals(receivedResponse, None)
363
364 def testHeaderOnlyNXDResponse(self):
365 """
366 Basics: Header-only NXD response should be dropped
367 """
368 name = 'header-only-nxd-response.tests.powerdns.com.'
369 query = dns.message.make_query(name, 'A', 'IN')
370 response = dns.message.make_response(query)
371 response.set_rcode(dns.rcode.NXDOMAIN)
372 response.question = []
373
374 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
375 self.assertTrue(receivedQuery)
376 receivedQuery.id = query.id
377 self.assertEquals(query, receivedQuery)
378 self.assertEquals(receivedResponse, None)
379
380 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
381 self.assertTrue(receivedQuery)
382 receivedQuery.id = query.id
383 self.assertEquals(query, receivedQuery)
384 self.assertEquals(receivedResponse, None)
385
f850b032
PL
386 def testAddActionDNSName(self):
387 """
388 Basics: test if addAction accepts a DNSName
389 """
390 name = 'dnsname.addaction.powerdns.com.'
391 query = dns.message.make_query(name, 'A', 'IN')
392 expectedResponse = dns.message.make_response(query)
393 expectedResponse.set_rcode(dns.rcode.REFUSED)
394
395 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
396 self.assertEquals(receivedResponse, expectedResponse)
397
398 def testAddActionDNSNames(self):
399 """
400 Basics: test if addAction accepts a table of DNSNames
401 """
402 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
403 query = dns.message.make_query(name, 'A', 'IN')
404 expectedResponse = dns.message.make_response(query)
405 expectedResponse.set_rcode(dns.rcode.REFUSED)
406
407 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
408 self.assertEquals(receivedResponse, expectedResponse)
ca404e94
RG
409
410if __name__ == '__main__':
411 unittest.main()
412 exit(0)