]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_Basics.py
update tests to new naming
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
CommitLineData
ca404e94 1#!/usr/bin/env python
ca404e94 2import unittest
b1bec9f0
RG
3import dns
4import clientsubnetoption
ca404e94
RG
5from dnsdisttests import DNSDistTest
6
7class TestBasics(DNSDistTest):
8
903853f4
RG
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 truncateTC(true)
d3ec24f9
PD
12 addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
903853f4
RG
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
d3ec24f9 16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
903853f4 17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
d3ec24f9
PD
18 addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
903853f4
RG
21 """
22
b63add03
RG
23 def testDropped(self):
24 """
25 Basics: Dropped query
26
27 Send an A query for drop.test.powerdns.com. domain,
28 which is dropped by configuration. We expect
29 no response.
30 """
31 name = 'drop.test.powerdns.com.'
32 query = dns.message.make_query(name, 'A', 'IN')
6ca2e796
RG
33 for method in ("sendUDPQuery", "sendTCPQuery"):
34 sender = getattr(self, method)
35 (_, receivedResponse) = sender(query, response=None, useQueue=False)
36 self.assertEquals(receivedResponse, None)
b63add03 37
ca404e94
RG
38 def testAWithECS(self):
39 """
617dfe22 40 Basics: A query with an ECS value
ca404e94
RG
41 """
42 name = 'awithecs.tests.powerdns.com.'
43 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
44 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
45 response = dns.message.make_response(query)
46 rrset = dns.rrset.from_text(name,
feb22f99 47 60,
ca404e94
RG
48 dns.rdataclass.IN,
49 dns.rdatatype.A,
50 '127.0.0.1')
51
52 response.answer.append(rrset)
53
6ca2e796
RG
54 for method in ("sendUDPQuery", "sendTCPQuery"):
55 sender = getattr(self, method)
56 (receivedQuery, receivedResponse) = sender(query, response)
57 receivedQuery.id = query.id
58 self.assertEquals(query, receivedQuery)
59 self.assertEquals(response, receivedResponse)
ca404e94
RG
60
61 def testSimpleA(self):
62 """
617dfe22 63 Basics: A query without EDNS
ca404e94
RG
64 """
65 name = 'simplea.tests.powerdns.com.'
66 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
67 response = dns.message.make_response(query)
e7a1029c 68 rrset = dns.rrset.from_text(name,
ca404e94
RG
69 3600,
70 dns.rdataclass.IN,
71 dns.rdatatype.A,
72 '127.0.0.1')
73 response.answer.append(rrset)
74
6ca2e796
RG
75 for method in ("sendUDPQuery", "sendTCPQuery"):
76 sender = getattr(self, method)
77 (receivedQuery, receivedResponse) = sender(query, response)
78 self.assertTrue(receivedQuery)
79 self.assertTrue(receivedResponse)
80 receivedQuery.id = query.id
81 self.assertEquals(query, receivedQuery)
82 self.assertEquals(response, receivedResponse)
ca404e94 83
ec5f5c6b
RG
84 def testAnyIsTruncated(self):
85 """
617dfe22
RG
86 Basics: Truncate ANY query
87
ec5f5c6b
RG
88 dnsdist is configured to reply with TC to ANY queries,
89 send an ANY query and check the result.
490a29bb 90 It should be truncated over UDP, not over TCP.
ec5f5c6b
RG
91 """
92 name = 'any.tests.powerdns.com.'
93 query = dns.message.make_query(name, 'ANY', 'IN')
94 expectedResponse = dns.message.make_response(query)
95 expectedResponse.flags |= dns.flags.TC
96
0a2087eb 97 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
ec5f5c6b
RG
98 self.assertEquals(receivedResponse, expectedResponse)
99
490a29bb 100 response = dns.message.make_response(query)
e7a1029c 101 rrset = dns.rrset.from_text(name,
490a29bb
RG
102 3600,
103 dns.rdataclass.IN,
104 dns.rdatatype.A,
105 '127.0.0.1')
106
107 response.answer.append(rrset)
108
109 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
110 self.assertTrue(receivedQuery)
111 self.assertTrue(receivedResponse)
112 receivedQuery.id = query.id
490a29bb
RG
113 self.assertEquals(query, receivedQuery)
114 self.assertEquals(receivedResponse, response)
ec5f5c6b
RG
115
116 def testTruncateTC(self):
117 """
617dfe22
RG
118 Basics: Truncate TC
119
ec5f5c6b
RG
120 dnsdist is configured to truncate TC (default),
121 we make the backend send responses
122 with TC set and additional content,
123 and check that the received response has been fixed.
124 """
125 name = 'atruncatetc.tests.powerdns.com.'
126 query = dns.message.make_query(name, 'A', 'IN')
127 response = dns.message.make_response(query)
128 rrset = dns.rrset.from_text(name,
129 3600,
130 dns.rdataclass.IN,
131 dns.rdatatype.A,
132 '127.0.0.1')
133
134 response.answer.append(rrset)
135 response.flags |= dns.flags.TC
e0fd37ec
RG
136 expectedResponse = dns.message.make_response(query)
137 expectedResponse.flags |= dns.flags.TC
138
139 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
140 receivedQuery.id = query.id
141 self.assertEquals(query, receivedQuery)
142 self.assertEquals(expectedResponse.flags, receivedResponse.flags)
143 self.assertEquals(expectedResponse.question, receivedResponse.question)
144 self.assertFalse(response.answer == receivedResponse.answer)
145 self.assertEquals(len(receivedResponse.answer), 0)
146 self.assertEquals(len(receivedResponse.authority), 0)
147 self.assertEquals(len(receivedResponse.additional), 0)
148 self.checkMessageNoEDNS(expectedResponse, receivedResponse)
149
150 def testTruncateTCEDNS(self):
151 """
152 Basics: Truncate TC with EDNS
153
154 dnsdist is configured to truncate TC (default),
155 we make the backend send responses
156 with TC set and additional content,
157 and check that the received response has been fixed.
158 Note that the query and initial response had EDNS,
159 so the final response should have it too.
160 """
161 name = 'atruncatetc.tests.powerdns.com.'
162 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
163 response = dns.message.make_response(query)
164 # force a different responder payload than the one in the query,
165 # so we check that we don't just mirror it
166 response.payload = 4242
167 rrset = dns.rrset.from_text(name,
168 3600,
169 dns.rdataclass.IN,
170 dns.rdatatype.A,
171 '127.0.0.1')
172
173 response.answer.append(rrset)
174 response.flags |= dns.flags.TC
175 expectedResponse = dns.message.make_response(query)
176 expectedResponse.payload = 4242
177 expectedResponse.flags |= dns.flags.TC
ec5f5c6b
RG
178
179 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
180 receivedQuery.id = query.id
ec5f5c6b
RG
181 self.assertEquals(query, receivedQuery)
182 self.assertEquals(response.flags, receivedResponse.flags)
183 self.assertEquals(response.question, receivedResponse.question)
184 self.assertFalse(response.answer == receivedResponse.answer)
185 self.assertEquals(len(receivedResponse.answer), 0)
186 self.assertEquals(len(receivedResponse.authority), 0)
187 self.assertEquals(len(receivedResponse.additional), 0)
e0fd37ec
RG
188 print(expectedResponse)
189 print(receivedResponse)
190 self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
191 self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
192 self.assertEquals(receivedResponse.payload, 4242)
ec5f5c6b
RG
193
194 def testRegexReturnsRefused(self):
195 """
617dfe22
RG
196 Basics: Refuse query matching regex
197
ec5f5c6b
RG
198 dnsdist is configured to reply 'refused' for query
199 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
200 We send a query for evil4242.powerdns.com
201 and check that the response is "refused".
202 """
203 name = 'evil4242.regex.tests.powerdns.com.'
204 query = dns.message.make_query(name, 'A', 'IN')
205 expectedResponse = dns.message.make_response(query)
206 expectedResponse.set_rcode(dns.rcode.REFUSED)
207
6ca2e796
RG
208 for method in ("sendUDPQuery", "sendTCPQuery"):
209 sender = getattr(self, method)
210 (_, receivedResponse) = sender(query, response=None, useQueue=False)
211 self.assertEquals(receivedResponse, expectedResponse)
ec5f5c6b 212
feb22f99 213 def testQNameReturnsSpoofed(self):
214 """
215 Basics: test QNameRule and Spoof
216
217 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
218 """
219 name = 'ds9a.nl.'
220 query = dns.message.make_query(name, 'A', 'IN')
221 query.flags &= ~dns.flags.RD
222 expectedResponse = dns.message.make_response(query)
223 expectedResponse.set_rcode(dns.rcode.NOERROR)
224 rrset = dns.rrset.from_text(name,
225 3600,
226 dns.rdataclass.IN,
227 dns.rdatatype.A,
228 '1.2.3.4')
229 expectedResponse.answer.append(rrset)
230
6ca2e796
RG
231 for method in ("sendUDPQuery", "sendTCPQuery"):
232 sender = getattr(self, method)
233 (_, receivedResponse) = sender(query, response=None, useQueue=False)
234 self.assertEquals(receivedResponse, expectedResponse)
feb22f99 235
ec5f5c6b
RG
236 def testDomainAndQTypeReturnsNotImplemented(self):
237 """
617dfe22
RG
238 Basics: NOTIMPL for specific name and qtype
239
ec5f5c6b 240 dnsdist is configured to reply 'not implemented' for query
e7a1029c 241 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
242 We send a TXT query for "nameAndQtype.powerdns.com."
243 and check that the response is 'not implemented'.
244 """
245 name = 'nameAndQtype.tests.powerdns.com.'
246 query = dns.message.make_query(name, 'TXT', 'IN')
247 expectedResponse = dns.message.make_response(query)
248 expectedResponse.set_rcode(dns.rcode.NOTIMP)
249
6ca2e796
RG
250 for method in ("sendUDPQuery", "sendTCPQuery"):
251 sender = getattr(self, method)
252 (_, receivedResponse) = sender(query, response=None, useQueue=False)
253 self.assertEquals(receivedResponse, expectedResponse)
ec5f5c6b
RG
254
255 def testDomainWithoutQTypeIsNotAffected(self):
256 """
617dfe22
RG
257 Basics: NOTIMPL qtype canary
258
ec5f5c6b 259 dnsdist is configured to reply 'not implemented' for query
e7a1029c 260 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
261 We send a A query for "nameAndQtype.tests.powerdns.com."
262 and check that the response is OK.
263 """
264 name = 'nameAndQtype.tests.powerdns.com.'
265 query = dns.message.make_query(name, 'A', 'IN')
266 response = dns.message.make_response(query)
267 rrset = dns.rrset.from_text(name,
268 3600,
269 dns.rdataclass.IN,
270 dns.rdatatype.A,
271 '127.0.0.1')
272 response.answer.append(rrset)
273
6ca2e796
RG
274 for method in ("sendUDPQuery", "sendTCPQuery"):
275 sender = getattr(self, method)
276 (receivedQuery, receivedResponse) = sender(query, response)
277 self.assertTrue(receivedQuery)
278 self.assertTrue(receivedResponse)
279 receivedQuery.id = query.id
280 self.assertEquals(query, receivedQuery)
281 self.assertEquals(response, receivedResponse)
ec5f5c6b
RG
282
283 def testOtherDomainANDQTypeIsNotAffected(self):
284 """
617dfe22
RG
285 Basics: NOTIMPL qname canary
286
ec5f5c6b 287 dnsdist is configured to reply 'not implemented' for query
e7a1029c 288 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
ec5f5c6b
RG
289 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
290 and check that the response is OK.
291 """
292 name = 'OtherNameAndQtype.tests.powerdns.com.'
293 query = dns.message.make_query(name, 'TXT', 'IN')
294 response = dns.message.make_response(query)
295 rrset = dns.rrset.from_text(name,
296 3600,
297 dns.rdataclass.IN,
298 dns.rdatatype.TXT,
299 'nothing to see here')
300 response.answer.append(rrset)
301
6ca2e796
RG
302 for method in ("sendUDPQuery", "sendTCPQuery"):
303 sender = getattr(self, method)
304 (receivedQuery, receivedResponse) = sender(query, response)
305 self.assertTrue(receivedQuery)
306 self.assertTrue(receivedResponse)
307 receivedQuery.id = query.id
308 self.assertEquals(query, receivedQuery)
309 self.assertEquals(response, receivedResponse)
ec5f5c6b 310
7267213a
RG
311 def testWrongResponse(self):
312 """
313 Basics: Unrelated response from the backend
314
315 The backend send an unrelated answer over UDP, it should
316 be discarded by dnsdist. It could happen if we wrap around
de4896ba 317 maxOutstanding queries too quickly or have more than maxOutstanding
7267213a
RG
318 queries to a specific backend in the air over UDP,
319 but does not really make sense over TCP.
320 """
321 name = 'query.unrelated.tests.powerdns.com.'
322 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
323 query = dns.message.make_query(name, 'TXT', 'IN')
324 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
325 unrelatedResponse = dns.message.make_response(unrelatedQuery)
326 rrset = dns.rrset.from_text(unrelatedName,
327 3600,
328 dns.rdataclass.IN,
329 dns.rdatatype.TXT,
330 'nothing to see here')
331 unrelatedResponse.answer.append(rrset)
332
6ca2e796
RG
333 for method in ("sendUDPQuery", "sendTCPQuery"):
334 sender = getattr(self, method)
335 (receivedQuery, receivedResponse) = sender(query, unrelatedResponse)
336 self.assertTrue(receivedQuery)
337 self.assertEquals(receivedResponse, None)
338 receivedQuery.id = query.id
339 self.assertEquals(query, receivedQuery)
f87c4aff 340
c8c3d4e4
RG
341 def testHeaderOnlyRefused(self):
342 """
343 Basics: Header-only refused response
344 """
345 name = 'header-only-refused-response.tests.powerdns.com.'
346 query = dns.message.make_query(name, 'A', 'IN')
347 response = dns.message.make_response(query)
348 response.set_rcode(dns.rcode.REFUSED)
349 response.question = []
350
6ca2e796
RG
351 for method in ("sendUDPQuery", "sendTCPQuery"):
352 sender = getattr(self, method)
353 (receivedQuery, receivedResponse) = sender(query, response)
354 self.assertTrue(receivedQuery)
355 receivedQuery.id = query.id
356 self.assertEquals(query, receivedQuery)
357 self.assertEquals(receivedResponse, response)
c8c3d4e4
RG
358
359 def testHeaderOnlyNoErrorResponse(self):
360 """
361 Basics: Header-only NoError response should be dropped
362 """
363 name = 'header-only-noerror-response.tests.powerdns.com.'
364 query = dns.message.make_query(name, 'A', 'IN')
365 response = dns.message.make_response(query)
366 response.question = []
367
6ca2e796
RG
368 for method in ("sendUDPQuery", "sendTCPQuery"):
369 sender = getattr(self, method)
370 (receivedQuery, receivedResponse) = sender(query, response)
371 self.assertTrue(receivedQuery)
372 receivedQuery.id = query.id
373 self.assertEquals(query, receivedQuery)
374 self.assertEquals(receivedResponse, None)
c8c3d4e4
RG
375
376 def testHeaderOnlyNXDResponse(self):
377 """
378 Basics: Header-only NXD response should be dropped
379 """
380 name = 'header-only-nxd-response.tests.powerdns.com.'
381 query = dns.message.make_query(name, 'A', 'IN')
382 response = dns.message.make_response(query)
383 response.set_rcode(dns.rcode.NXDOMAIN)
384 response.question = []
385
6ca2e796
RG
386 for method in ("sendUDPQuery", "sendTCPQuery"):
387 sender = getattr(self, method)
388 (receivedQuery, receivedResponse) = sender(query, response)
389 self.assertTrue(receivedQuery)
390 receivedQuery.id = query.id
391 self.assertEquals(query, receivedQuery)
392 self.assertEquals(receivedResponse, None)
c8c3d4e4 393
f850b032
PL
394 def testAddActionDNSName(self):
395 """
396 Basics: test if addAction accepts a DNSName
397 """
398 name = 'dnsname.addaction.powerdns.com.'
399 query = dns.message.make_query(name, 'A', 'IN')
400 expectedResponse = dns.message.make_response(query)
401 expectedResponse.set_rcode(dns.rcode.REFUSED)
402
6ca2e796
RG
403 for method in ("sendUDPQuery", "sendTCPQuery"):
404 sender = getattr(self, method)
405 (_, receivedResponse) = sender(query, response=None, useQueue=False)
406 self.assertEquals(receivedResponse, expectedResponse)
f850b032
PL
407
408 def testAddActionDNSNames(self):
409 """
410 Basics: test if addAction accepts a table of DNSNames
411 """
412 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
413 query = dns.message.make_query(name, 'A', 'IN')
414 expectedResponse = dns.message.make_response(query)
415 expectedResponse.set_rcode(dns.rcode.REFUSED)
416
6ca2e796
RG
417 for method in ("sendUDPQuery", "sendTCPQuery"):
418 sender = getattr(self, method)
419 (_, receivedResponse) = sender(query, response=None, useQueue=False)
420 self.assertEquals(receivedResponse, expectedResponse)
ca404e94 421