]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_Basics.py
Add autocompletion data
[thirdparty/pdns.git] / regression-tests.dnsdist / test_Basics.py
1 #!/usr/bin/env python
2 import unittest
3 import dns
4 import clientsubnetoption
5 from dnsdisttests import DNSDistTest
6
7 class TestBasics(DNSDistTest):
8
9 _config_template = """
10 newServer{address="127.0.0.1:%s"}
11 truncateTC(true)
12 addAction(AndRule{QTypeRule(dnsdist.ANY), TCPRule(false)}, TCAction())
13 addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
14 mySMN = newSuffixMatchNode()
15 mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
16 addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
17 addAction(makeRule("drop.test.powerdns.com."), DropAction())
18 addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
19 addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
20 addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
21 """
22
23 def testDropped(self):
24 """
25 Basics: Dropped query
26
27 Send an A query for drop.test.powerdns.com. domain,
28 which is dropped by configuration. We expect
29 no response.
30 """
31 name = 'drop.test.powerdns.com.'
32 query = dns.message.make_query(name, 'A', 'IN')
33 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
34 self.assertEquals(receivedResponse, None)
35
36 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
37 self.assertEquals(receivedResponse, None)
38
39 def testAWithECS(self):
40 """
41 Basics: A query with an ECS value
42 """
43 name = 'awithecs.tests.powerdns.com.'
44 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
45 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso])
46 response = dns.message.make_response(query)
47 rrset = dns.rrset.from_text(name,
48 60,
49 dns.rdataclass.IN,
50 dns.rdatatype.A,
51 '127.0.0.1')
52
53 response.answer.append(rrset)
54
55 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
56 receivedQuery.id = query.id
57 self.assertEquals(query, receivedQuery)
58 self.assertEquals(response, receivedResponse)
59
60 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
61 receivedQuery.id = query.id
62 self.assertEquals(query, receivedQuery)
63 self.assertEquals(response, receivedResponse)
64
65 def testSimpleA(self):
66 """
67 Basics: A query without EDNS
68 """
69 name = 'simplea.tests.powerdns.com.'
70 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
71 response = dns.message.make_response(query)
72 rrset = dns.rrset.from_text(name,
73 3600,
74 dns.rdataclass.IN,
75 dns.rdatatype.A,
76 '127.0.0.1')
77 response.answer.append(rrset)
78
79 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
80 self.assertTrue(receivedQuery)
81 self.assertTrue(receivedResponse)
82 receivedQuery.id = query.id
83 self.assertEquals(query, receivedQuery)
84 self.assertEquals(response, receivedResponse)
85
86 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
87 self.assertTrue(receivedQuery)
88 self.assertTrue(receivedResponse)
89 receivedQuery.id = query.id
90 self.assertEquals(query, receivedQuery)
91 self.assertEquals(response, receivedResponse)
92
93 def testAnyIsTruncated(self):
94 """
95 Basics: Truncate ANY query
96
97 dnsdist is configured to reply with TC to ANY queries,
98 send an ANY query and check the result.
99 It should be truncated over UDP, not over TCP.
100 """
101 name = 'any.tests.powerdns.com.'
102 query = dns.message.make_query(name, 'ANY', 'IN')
103 expectedResponse = dns.message.make_response(query)
104 expectedResponse.flags |= dns.flags.TC
105
106 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
107 self.assertEquals(receivedResponse, expectedResponse)
108
109 response = dns.message.make_response(query)
110 rrset = dns.rrset.from_text(name,
111 3600,
112 dns.rdataclass.IN,
113 dns.rdatatype.A,
114 '127.0.0.1')
115
116 response.answer.append(rrset)
117
118 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
119 self.assertTrue(receivedQuery)
120 self.assertTrue(receivedResponse)
121 receivedQuery.id = query.id
122 self.assertEquals(query, receivedQuery)
123 self.assertEquals(receivedResponse, response)
124
125 def testTruncateTC(self):
126 """
127 Basics: Truncate TC
128
129 dnsdist is configured to truncate TC (default),
130 we make the backend send responses
131 with TC set and additional content,
132 and check that the received response has been fixed.
133 """
134 name = 'atruncatetc.tests.powerdns.com.'
135 query = dns.message.make_query(name, 'A', 'IN')
136 response = dns.message.make_response(query)
137 rrset = dns.rrset.from_text(name,
138 3600,
139 dns.rdataclass.IN,
140 dns.rdatatype.A,
141 '127.0.0.1')
142
143 response.answer.append(rrset)
144 response.flags |= dns.flags.TC
145
146 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
147 receivedQuery.id = query.id
148 self.assertEquals(query, receivedQuery)
149 self.assertEquals(response.flags, receivedResponse.flags)
150 self.assertEquals(response.question, receivedResponse.question)
151 self.assertFalse(response.answer == receivedResponse.answer)
152 self.assertEquals(len(receivedResponse.answer), 0)
153 self.assertEquals(len(receivedResponse.authority), 0)
154 self.assertEquals(len(receivedResponse.additional), 0)
155
156 def testRegexReturnsRefused(self):
157 """
158 Basics: Refuse query matching regex
159
160 dnsdist is configured to reply 'refused' for query
161 matching "evil[0-9]{4,}\\.regex\\.tests\\.powerdns\\.com$".
162 We send a query for evil4242.powerdns.com
163 and check that the response is "refused".
164 """
165 name = 'evil4242.regex.tests.powerdns.com.'
166 query = dns.message.make_query(name, 'A', 'IN')
167 expectedResponse = dns.message.make_response(query)
168 expectedResponse.set_rcode(dns.rcode.REFUSED)
169
170 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
171 self.assertEquals(receivedResponse, expectedResponse)
172
173 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
174 self.assertEquals(receivedResponse, expectedResponse)
175
176 def testQNameReturnsSpoofed(self):
177 """
178 Basics: test QNameRule and Spoof
179
180 dnsdist is configured to reply 1.2.3.4 for A query for exactly ds9a.nl
181 """
182 name = 'ds9a.nl.'
183 query = dns.message.make_query(name, 'A', 'IN')
184 query.flags &= ~dns.flags.RD
185 expectedResponse = dns.message.make_response(query)
186 expectedResponse.set_rcode(dns.rcode.NOERROR)
187 rrset = dns.rrset.from_text(name,
188 3600,
189 dns.rdataclass.IN,
190 dns.rdatatype.A,
191 '1.2.3.4')
192 expectedResponse.answer.append(rrset)
193
194 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
195 self.assertEquals(receivedResponse, expectedResponse)
196
197 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
198 self.assertEquals(receivedResponse, expectedResponse)
199
200
201 def testDomainAndQTypeReturnsNotImplemented(self):
202 """
203 Basics: NOTIMPL for specific name and qtype
204
205 dnsdist is configured to reply 'not implemented' for query
206 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
207 We send a TXT query for "nameAndQtype.powerdns.com."
208 and check that the response is 'not implemented'.
209 """
210 name = 'nameAndQtype.tests.powerdns.com.'
211 query = dns.message.make_query(name, 'TXT', 'IN')
212 expectedResponse = dns.message.make_response(query)
213 expectedResponse.set_rcode(dns.rcode.NOTIMP)
214
215 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
216 self.assertEquals(receivedResponse, expectedResponse)
217
218 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
219 self.assertEquals(receivedResponse, expectedResponse)
220
221 def testDomainWithoutQTypeIsNotAffected(self):
222 """
223 Basics: NOTIMPL qtype canary
224
225 dnsdist is configured to reply 'not implemented' for query
226 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
227 We send a A query for "nameAndQtype.tests.powerdns.com."
228 and check that the response is OK.
229 """
230 name = 'nameAndQtype.tests.powerdns.com.'
231 query = dns.message.make_query(name, 'A', 'IN')
232 response = dns.message.make_response(query)
233 rrset = dns.rrset.from_text(name,
234 3600,
235 dns.rdataclass.IN,
236 dns.rdatatype.A,
237 '127.0.0.1')
238 response.answer.append(rrset)
239
240 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
241 self.assertTrue(receivedQuery)
242 self.assertTrue(receivedResponse)
243 receivedQuery.id = query.id
244 self.assertEquals(query, receivedQuery)
245 self.assertEquals(response, receivedResponse)
246
247 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
248 self.assertTrue(receivedQuery)
249 self.assertTrue(receivedResponse)
250 receivedQuery.id = query.id
251 self.assertEquals(query, receivedQuery)
252 self.assertEquals(response, receivedResponse)
253
254 def testOtherDomainANDQTypeIsNotAffected(self):
255 """
256 Basics: NOTIMPL qname canary
257
258 dnsdist is configured to reply 'not implemented' for query
259 matching "nameAndQtype.tests.powerdns.com." AND qtype TXT.
260 We send a TXT query for "OtherNameAndQtype.tests.powerdns.com."
261 and check that the response is OK.
262 """
263 name = 'OtherNameAndQtype.tests.powerdns.com.'
264 query = dns.message.make_query(name, 'TXT', 'IN')
265 response = dns.message.make_response(query)
266 rrset = dns.rrset.from_text(name,
267 3600,
268 dns.rdataclass.IN,
269 dns.rdatatype.TXT,
270 'nothing to see here')
271 response.answer.append(rrset)
272
273 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
274 self.assertTrue(receivedQuery)
275 self.assertTrue(receivedResponse)
276 receivedQuery.id = query.id
277 self.assertEquals(query, receivedQuery)
278 self.assertEquals(response, receivedResponse)
279
280 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
281 self.assertTrue(receivedQuery)
282 self.assertTrue(receivedResponse)
283 receivedQuery.id = query.id
284 self.assertEquals(query, receivedQuery)
285 self.assertEquals(response, receivedResponse)
286
287 def testWrongResponse(self):
288 """
289 Basics: Unrelated response from the backend
290
291 The backend send an unrelated answer over UDP, it should
292 be discarded by dnsdist. It could happen if we wrap around
293 maxOutstanding queries too quickly or have more than maxOutstanding
294 queries to a specific backend in the air over UDP,
295 but does not really make sense over TCP.
296 """
297 name = 'query.unrelated.tests.powerdns.com.'
298 unrelatedName = 'answer.unrelated.tests.powerdns.com.'
299 query = dns.message.make_query(name, 'TXT', 'IN')
300 unrelatedQuery = dns.message.make_query(unrelatedName, 'TXT', 'IN')
301 unrelatedResponse = dns.message.make_response(unrelatedQuery)
302 rrset = dns.rrset.from_text(unrelatedName,
303 3600,
304 dns.rdataclass.IN,
305 dns.rdatatype.TXT,
306 'nothing to see here')
307 unrelatedResponse.answer.append(rrset)
308
309 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, unrelatedResponse)
310 self.assertTrue(receivedQuery)
311 self.assertEquals(receivedResponse, None)
312 receivedQuery.id = query.id
313 self.assertEquals(query, receivedQuery)
314
315 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, unrelatedResponse)
316 self.assertTrue(receivedQuery)
317 self.assertEquals(receivedResponse, None)
318 receivedQuery.id = query.id
319 self.assertEquals(query, receivedQuery)
320
321 def testHeaderOnlyRefused(self):
322 """
323 Basics: Header-only refused response
324 """
325 name = 'header-only-refused-response.tests.powerdns.com.'
326 query = dns.message.make_query(name, 'A', 'IN')
327 response = dns.message.make_response(query)
328 response.set_rcode(dns.rcode.REFUSED)
329 response.question = []
330
331 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
332 self.assertTrue(receivedQuery)
333 receivedQuery.id = query.id
334 self.assertEquals(query, receivedQuery)
335 self.assertEquals(receivedResponse, response)
336
337 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
338 self.assertTrue(receivedQuery)
339 receivedQuery.id = query.id
340 self.assertEquals(query, receivedQuery)
341 self.assertEquals(receivedResponse, response)
342
343 def testHeaderOnlyNoErrorResponse(self):
344 """
345 Basics: Header-only NoError response should be dropped
346 """
347 name = 'header-only-noerror-response.tests.powerdns.com.'
348 query = dns.message.make_query(name, 'A', 'IN')
349 response = dns.message.make_response(query)
350 response.question = []
351
352 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
353 self.assertTrue(receivedQuery)
354 receivedQuery.id = query.id
355 self.assertEquals(query, receivedQuery)
356 self.assertEquals(receivedResponse, None)
357
358 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
359 self.assertTrue(receivedQuery)
360 receivedQuery.id = query.id
361 self.assertEquals(query, receivedQuery)
362 self.assertEquals(receivedResponse, None)
363
364 def testHeaderOnlyNXDResponse(self):
365 """
366 Basics: Header-only NXD response should be dropped
367 """
368 name = 'header-only-nxd-response.tests.powerdns.com.'
369 query = dns.message.make_query(name, 'A', 'IN')
370 response = dns.message.make_response(query)
371 response.set_rcode(dns.rcode.NXDOMAIN)
372 response.question = []
373
374 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
375 self.assertTrue(receivedQuery)
376 receivedQuery.id = query.id
377 self.assertEquals(query, receivedQuery)
378 self.assertEquals(receivedResponse, None)
379
380 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
381 self.assertTrue(receivedQuery)
382 receivedQuery.id = query.id
383 self.assertEquals(query, receivedQuery)
384 self.assertEquals(receivedResponse, None)
385
386 def testAddActionDNSName(self):
387 """
388 Basics: test if addAction accepts a DNSName
389 """
390 name = 'dnsname.addaction.powerdns.com.'
391 query = dns.message.make_query(name, 'A', 'IN')
392 expectedResponse = dns.message.make_response(query)
393 expectedResponse.set_rcode(dns.rcode.REFUSED)
394
395 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
396 self.assertEquals(receivedResponse, expectedResponse)
397
398 def testAddActionDNSNames(self):
399 """
400 Basics: test if addAction accepts a table of DNSNames
401 """
402 for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
403 query = dns.message.make_query(name, 'A', 'IN')
404 expectedResponse = dns.message.make_response(query)
405 expectedResponse.set_rcode(dns.rcode.REFUSED)
406
407 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
408 self.assertEquals(receivedResponse, expectedResponse)
409
410 if __name__ == '__main__':
411 unittest.main()
412 exit(0)