]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_RecordsCount.py
dnsdist: Add a new response chain for XFR responses
[thirdparty/pdns.git] / regression-tests.dnsdist / test_RecordsCount.py
CommitLineData
55baa1f2
RG
1#!/usr/bin/env python
2import copy
3import os
4import dns
5from dnsdisttests import DNSDistTest
6
7class TestRecordsCountOnlyOneAR(DNSDistTest):
8
9 _config_template = """
d3ec24f9 10 addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(DNSRCode.REFUSED))
55baa1f2
RG
11 newServer{address="127.0.0.1:%s"}
12 """
13
14 def testRecordsCountRefuseEmptyAR(self):
15 """
65fc9d08 16 RecordsCount: Refuse arcount == 0 (No OPT)
55baa1f2
RG
17
18 Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
19 check that we are getting a REFUSED response.
20 """
21 name = 'refuseemptyar.recordscount.tests.powerdns.com.'
22 query = dns.message.make_query(name, 'A', 'IN')
7af22479 23 query.flags &= ~dns.flags.RD
55baa1f2
RG
24 expectedResponse = dns.message.make_response(query)
25 expectedResponse.set_rcode(dns.rcode.REFUSED)
26
6ca2e796
RG
27 for method in ("sendUDPQuery", "sendTCPQuery"):
28 sender = getattr(self, method)
29 (_, receivedResponse) = sender(query, response=None, useQueue=False)
4bfebc93 30 self.assertEqual(receivedResponse, expectedResponse)
55baa1f2
RG
31
32 def testRecordsCountAllowOneAR(self):
33 """
65fc9d08 34 RecordsCount: Allow arcount == 1 (OPT)
55baa1f2
RG
35
36 Send a query to "allowonear.recordscount.tests.powerdns.com.",
37 check that we are getting a valid response.
38 """
39 name = 'allowonear.recordscount.tests.powerdns.com.'
40 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
41 response = dns.message.make_response(query)
42 response.answer.append(dns.rrset.from_text(name,
43 3600,
44 dns.rdataclass.IN,
45 dns.rdatatype.A,
46 '127.0.0.1'))
47
6ca2e796
RG
48 for method in ("sendUDPQuery", "sendTCPQuery"):
49 sender = getattr(self, method)
50 (receivedQuery, receivedResponse) = sender(query, response)
51 self.assertTrue(receivedQuery)
52 self.assertTrue(receivedResponse)
53 receivedQuery.id = query.id
4bfebc93
CH
54 self.assertEqual(query, receivedQuery)
55 self.assertEqual(response, receivedResponse)
55baa1f2
RG
56
57 def testRecordsCountRefuseTwoAR(self):
58 """
65fc9d08 59 RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record)
55baa1f2
RG
60
61 Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
62 check that we are getting a REFUSED response.
63 """
64 name = 'refusetwoar.recordscount.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
7af22479 66 query.flags &= ~dns.flags.RD
55baa1f2
RG
67 query.additional.append(dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1'))
72 expectedResponse = dns.message.make_response(query)
73 expectedResponse.set_rcode(dns.rcode.REFUSED)
6e1f856f
RG
74 # this is not great, we should fix that!
75 expectedResponse.additional.append(dns.rrset.from_text(name,
76 3600,
77 dns.rdataclass.IN,
78 dns.rdatatype.A,
79 '127.0.0.1'))
55baa1f2 80
6ca2e796
RG
81 for method in ("sendUDPQuery", "sendTCPQuery"):
82 sender = getattr(self, method)
83 (_, receivedResponse) = sender(query, response=None, useQueue=False)
4bfebc93 84 self.assertEqual(receivedResponse, expectedResponse)
55baa1f2
RG
85
86class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest):
87
88 _config_template = """
89 addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
d3ec24f9 90 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
55baa1f2
RG
91 newServer{address="127.0.0.1:%s"}
92 """
93
94 def testRecordsCountRefuseOneAN(self):
95 """
96 RecordsCount: Refuse ancount == 0
97
98 Send a query to "refusenoan.recordscount.tests.powerdns.com.",
99 check that we are getting a REFUSED response.
100 """
101 name = 'refusenoan.recordscount.tests.powerdns.com.'
102 query = dns.message.make_query(name, 'A', 'IN')
7af22479 103 query.flags &= ~dns.flags.RD
55baa1f2
RG
104 expectedResponse = dns.message.make_response(query)
105 expectedResponse.set_rcode(dns.rcode.REFUSED)
106
6ca2e796
RG
107 for method in ("sendUDPQuery", "sendTCPQuery"):
108 sender = getattr(self, method)
109 (_, receivedResponse) = sender(query, response=None, useQueue=False)
4bfebc93 110 self.assertEqual(receivedResponse, expectedResponse)
55baa1f2
RG
111
112 def testRecordsCountAllowTwoAN(self):
113 """
114 RecordsCount: Allow ancount == 2
115
116 Send a query to "allowtwoan.recordscount.tests.powerdns.com.",
117 check that we are getting a valid response.
118 """
119 name = 'allowtwoan.recordscount.tests.powerdns.com.'
120 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
121 rrset = dns.rrset.from_text_list(name,
122 3600,
123 dns.rdataclass.IN,
124 dns.rdatatype.A,
125 ['127.0.0.1', '127.0.0.2'])
126 query.answer.append(rrset)
127 response = dns.message.make_response(query)
128 response.answer.append(rrset)
129
6ca2e796
RG
130 for method in ("sendUDPQuery", "sendTCPQuery"):
131 sender = getattr(self, method)
132 (receivedQuery, receivedResponse) = sender(query, response)
133 self.assertTrue(receivedQuery)
134 self.assertTrue(receivedResponse)
135 receivedQuery.id = query.id
4bfebc93
CH
136 self.assertEqual(query, receivedQuery)
137 self.assertEqual(response, receivedResponse)
55baa1f2
RG
138
139 def testRecordsCountRefuseFourAN(self):
140 """
141 RecordsCount: Refuse ancount > 3
142
143 Send a query to "refusefouran.recordscount.tests.powerdns.com.",
144 check that we are getting a REFUSED response.
145 """
146 name = 'refusefouran.recordscount.tests.powerdns.com.'
147 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
7af22479 148 query.flags &= ~dns.flags.RD
55baa1f2
RG
149 rrset = dns.rrset.from_text_list(name,
150 3600,
151 dns.rdataclass.IN,
152 dns.rdatatype.A,
153 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4'])
154 query.answer.append(rrset)
155
156 expectedResponse = dns.message.make_response(query)
157 expectedResponse.set_rcode(dns.rcode.REFUSED)
158 expectedResponse.answer.append(rrset)
159
6ca2e796
RG
160 for method in ("sendUDPQuery", "sendTCPQuery"):
161 sender = getattr(self, method)
162 (_, receivedResponse) = sender(query, response=None, useQueue=False)
4bfebc93 163 self.assertEqual(receivedResponse, expectedResponse)
55baa1f2
RG
164
165class TestRecordsCountNothingInNS(DNSDistTest):
166
167 _config_template = """
168 addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
d3ec24f9 169 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
55baa1f2
RG
170 newServer{address="127.0.0.1:%s"}
171 """
172
173 def testRecordsCountRefuseNS(self):
174 """
175 RecordsCount: Refuse nscount != 0
176
177 Send a query to "refusens.recordscount.tests.powerdns.com.",
178 check that we are getting a REFUSED response.
179 """
180 name = 'refusens.recordscount.tests.powerdns.com.'
181 query = dns.message.make_query(name, 'A', 'IN')
182 rrset = dns.rrset.from_text(name,
183 3600,
184 dns.rdataclass.IN,
185 dns.rdatatype.NS,
186 'ns.tests.powerdns.com.')
187 query.authority.append(rrset)
7af22479 188 query.flags &= ~dns.flags.RD
55baa1f2
RG
189 expectedResponse = dns.message.make_response(query)
190 expectedResponse.set_rcode(dns.rcode.REFUSED)
191 expectedResponse.authority.append(rrset)
192
6ca2e796
RG
193 for method in ("sendUDPQuery", "sendTCPQuery"):
194 sender = getattr(self, method)
195 (_, receivedResponse) = sender(query, response=None, useQueue=False)
4bfebc93 196 self.assertEqual(receivedResponse, expectedResponse)
55baa1f2
RG
197
198
199 def testRecordsCountAllowEmptyNS(self):
200 """
201 RecordsCount: Allow nscount == 0
202
203 Send a query to "allowns.recordscount.tests.powerdns.com.",
204 check that we are getting a valid response.
205 """
206 name = 'allowns.recordscount.tests.powerdns.com.'
207 query = dns.message.make_query(name, 'A', 'IN')
208 response = dns.message.make_response(query)
209 response.answer.append(dns.rrset.from_text(name,
210 3600,
211 dns.rdataclass.IN,
212 dns.rdatatype.A,
213 '127.0.0.1'))
214
6ca2e796
RG
215 for method in ("sendUDPQuery", "sendTCPQuery"):
216 sender = getattr(self, method)
217 (receivedQuery, receivedResponse) = sender(query, response)
218 self.assertTrue(receivedQuery)
219 self.assertTrue(receivedResponse)
220 receivedQuery.id = query.id
4bfebc93
CH
221 self.assertEqual(query, receivedQuery)
222 self.assertEqual(response, receivedResponse)
55baa1f2
RG
223
224class TestRecordsCountNoOPTInAR(DNSDistTest):
225
226 _config_template = """
d3ec24f9 227 addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, DNSQType.OPT, 0, 0)), RCodeAction(DNSRCode.REFUSED))
55baa1f2
RG
228 newServer{address="127.0.0.1:%s"}
229 """
230
231 def testRecordsCountRefuseOPTinAR(self):
232 """
233 RecordsTypeCount: Refuse OPT in AR
234
235 Send a query to "refuseoptinar.recordscount.tests.powerdns.com.",
236 check that we are getting a REFUSED response.
237 """
238 name = 'refuseoptinar.recordscount.tests.powerdns.com.'
239 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
7af22479 240 query.flags &= ~dns.flags.RD
55baa1f2
RG
241 expectedResponse = dns.message.make_response(query)
242 expectedResponse.set_rcode(dns.rcode.REFUSED)
243
6ca2e796
RG
244 for method in ("sendUDPQuery", "sendTCPQuery"):
245 sender = getattr(self, method)
246 (_, receivedResponse) = sender(query, response=None, useQueue=False)
4bfebc93 247 self.assertEqual(receivedResponse, expectedResponse)
55baa1f2
RG
248
249 def testRecordsCountAllowNoOPTInAR(self):
250 """
251 RecordsTypeCount: Allow no OPT in AR
252
253 Send a query to "allownooptinar.recordscount.tests.powerdns.com.",
254 check that we are getting a valid response.
255 """
256 name = 'allowwnooptinar.recordscount.tests.powerdns.com.'
257 query = dns.message.make_query(name, 'A', 'IN')
258 response = dns.message.make_response(query)
259 response.answer.append(dns.rrset.from_text(name,
260 3600,
261 dns.rdataclass.IN,
262 dns.rdatatype.A,
263 '127.0.0.1'))
264
6ca2e796
RG
265 for method in ("sendUDPQuery", "sendTCPQuery"):
266 sender = getattr(self, method)
267 (receivedQuery, receivedResponse) = sender(query, response)
268 self.assertTrue(receivedQuery)
269 self.assertTrue(receivedResponse)
270 receivedQuery.id = query.id
4bfebc93
CH
271 self.assertEqual(query, receivedQuery)
272 self.assertEqual(response, receivedResponse)
65fc9d08
RG
273
274 def testRecordsCountAllowTwoARButNoOPT(self):
275 """
276 RecordsTypeCount: Allow arcount > 1 without OPT
277
278 Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.",
279 check that we are getting a valid response.
280 """
281 name = 'allowtwoarnoopt.recordscount.tests.powerdns.com.'
282 query = dns.message.make_query(name, 'A', 'IN')
283 query.additional.append(dns.rrset.from_text(name,
284 3600,
285 dns.rdataclass.IN,
286 dns.rdatatype.A,
287 '127.0.0.1'))
288 query.additional.append(dns.rrset.from_text(name,
289 3600,
290 dns.rdataclass.IN,
291 dns.rdatatype.A,
292 '127.0.0.1'))
293
294 response = dns.message.make_response(query)
295 response.answer.append(dns.rrset.from_text(name,
296 3600,
297 dns.rdataclass.IN,
298 dns.rdatatype.A,
299 '127.0.0.1'))
300
6ca2e796
RG
301 for method in ("sendUDPQuery", "sendTCPQuery"):
302 sender = getattr(self, method)
303 (receivedQuery, receivedResponse) = sender(query, response)
304 self.assertTrue(receivedQuery)
305 self.assertTrue(receivedResponse)
306 receivedQuery.id = query.id
4bfebc93
CH
307 self.assertEqual(query, receivedQuery)
308 self.assertEqual(response, receivedResponse)