]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_RecordsCount.py
Merge pull request #13567 from omoerbeek/rec-disable-sl-deprecated
[thirdparty/pdns.git] / regression-tests.dnsdist / test_RecordsCount.py
1 #!/usr/bin/env python
2 import copy
3 import os
4 import dns
5 from dnsdisttests import DNSDistTest
6
7 class TestRecordsCountOnlyOneAR(DNSDistTest):
8
9 _config_template = """
10 addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(DNSRCode.REFUSED))
11 newServer{address="127.0.0.1:%s"}
12 """
13
14 def testRecordsCountRefuseEmptyAR(self):
15 """
16 RecordsCount: Refuse arcount == 0 (No OPT)
17
18 Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
19 check that we are getting a REFUSED response.
20 """
21 name = 'refuseemptyar.recordscount.tests.powerdns.com.'
22 query = dns.message.make_query(name, 'A', 'IN')
23 query.flags &= ~dns.flags.RD
24 expectedResponse = dns.message.make_response(query)
25 expectedResponse.set_rcode(dns.rcode.REFUSED)
26
27 for method in ("sendUDPQuery", "sendTCPQuery"):
28 sender = getattr(self, method)
29 (_, receivedResponse) = sender(query, response=None, useQueue=False)
30 self.assertEqual(receivedResponse, expectedResponse)
31
32 def testRecordsCountAllowOneAR(self):
33 """
34 RecordsCount: Allow arcount == 1 (OPT)
35
36 Send a query to "allowonear.recordscount.tests.powerdns.com.",
37 check that we are getting a valid response.
38 """
39 name = 'allowonear.recordscount.tests.powerdns.com.'
40 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
41 response = dns.message.make_response(query)
42 response.answer.append(dns.rrset.from_text(name,
43 3600,
44 dns.rdataclass.IN,
45 dns.rdatatype.A,
46 '127.0.0.1'))
47
48 for method in ("sendUDPQuery", "sendTCPQuery"):
49 sender = getattr(self, method)
50 (receivedQuery, receivedResponse) = sender(query, response)
51 self.assertTrue(receivedQuery)
52 self.assertTrue(receivedResponse)
53 receivedQuery.id = query.id
54 self.assertEqual(query, receivedQuery)
55 self.assertEqual(response, receivedResponse)
56
57 def testRecordsCountRefuseTwoAR(self):
58 """
59 RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record)
60
61 Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
62 check that we are getting a REFUSED response.
63 """
64 name = 'refusetwoar.recordscount.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
66 query.flags &= ~dns.flags.RD
67 query.additional.append(dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1'))
72 expectedResponse = dns.message.make_response(query)
73 expectedResponse.set_rcode(dns.rcode.REFUSED)
74 # this is not great, we should fix that!
75 expectedResponse.additional.append(dns.rrset.from_text(name,
76 3600,
77 dns.rdataclass.IN,
78 dns.rdatatype.A,
79 '127.0.0.1'))
80
81 for method in ("sendUDPQuery", "sendTCPQuery"):
82 sender = getattr(self, method)
83 (_, receivedResponse) = sender(query, response=None, useQueue=False)
84 self.assertEqual(receivedResponse, expectedResponse)
85
86 class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest):
87
88 _config_template = """
89 addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
90 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
91 newServer{address="127.0.0.1:%s"}
92 """
93
94 def testRecordsCountRefuseOneAN(self):
95 """
96 RecordsCount: Refuse ancount == 0
97
98 Send a query to "refusenoan.recordscount.tests.powerdns.com.",
99 check that we are getting a REFUSED response.
100 """
101 name = 'refusenoan.recordscount.tests.powerdns.com.'
102 query = dns.message.make_query(name, 'A', 'IN')
103 query.flags &= ~dns.flags.RD
104 expectedResponse = dns.message.make_response(query)
105 expectedResponse.set_rcode(dns.rcode.REFUSED)
106
107 for method in ("sendUDPQuery", "sendTCPQuery"):
108 sender = getattr(self, method)
109 (_, receivedResponse) = sender(query, response=None, useQueue=False)
110 self.assertEqual(receivedResponse, expectedResponse)
111
112 def testRecordsCountAllowTwoAN(self):
113 """
114 RecordsCount: Allow ancount == 2
115
116 Send a query to "allowtwoan.recordscount.tests.powerdns.com.",
117 check that we are getting a valid response.
118 """
119 name = 'allowtwoan.recordscount.tests.powerdns.com.'
120 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
121 rrset = dns.rrset.from_text_list(name,
122 3600,
123 dns.rdataclass.IN,
124 dns.rdatatype.A,
125 ['127.0.0.1', '127.0.0.2'])
126 query.answer.append(rrset)
127 response = dns.message.make_response(query)
128 response.answer.append(rrset)
129
130 for method in ("sendUDPQuery", "sendTCPQuery"):
131 sender = getattr(self, method)
132 (receivedQuery, receivedResponse) = sender(query, response)
133 self.assertTrue(receivedQuery)
134 self.assertTrue(receivedResponse)
135 receivedQuery.id = query.id
136 self.assertEqual(query, receivedQuery)
137 self.assertEqual(response, receivedResponse)
138
139 def testRecordsCountRefuseFourAN(self):
140 """
141 RecordsCount: Refuse ancount > 3
142
143 Send a query to "refusefouran.recordscount.tests.powerdns.com.",
144 check that we are getting a REFUSED response.
145 """
146 name = 'refusefouran.recordscount.tests.powerdns.com.'
147 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
148 query.flags &= ~dns.flags.RD
149 rrset = dns.rrset.from_text_list(name,
150 3600,
151 dns.rdataclass.IN,
152 dns.rdatatype.A,
153 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4'])
154 query.answer.append(rrset)
155
156 expectedResponse = dns.message.make_response(query)
157 expectedResponse.set_rcode(dns.rcode.REFUSED)
158 expectedResponse.answer.append(rrset)
159
160 for method in ("sendUDPQuery", "sendTCPQuery"):
161 sender = getattr(self, method)
162 (_, receivedResponse) = sender(query, response=None, useQueue=False)
163 self.assertEqual(receivedResponse, expectedResponse)
164
165 class TestRecordsCountNothingInNS(DNSDistTest):
166
167 _config_template = """
168 addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
169 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
170 newServer{address="127.0.0.1:%s"}
171 """
172
173 def testRecordsCountRefuseNS(self):
174 """
175 RecordsCount: Refuse nscount != 0
176
177 Send a query to "refusens.recordscount.tests.powerdns.com.",
178 check that we are getting a REFUSED response.
179 """
180 name = 'refusens.recordscount.tests.powerdns.com.'
181 query = dns.message.make_query(name, 'A', 'IN')
182 rrset = dns.rrset.from_text(name,
183 3600,
184 dns.rdataclass.IN,
185 dns.rdatatype.NS,
186 'ns.tests.powerdns.com.')
187 query.authority.append(rrset)
188 query.flags &= ~dns.flags.RD
189 expectedResponse = dns.message.make_response(query)
190 expectedResponse.set_rcode(dns.rcode.REFUSED)
191 expectedResponse.authority.append(rrset)
192
193 for method in ("sendUDPQuery", "sendTCPQuery"):
194 sender = getattr(self, method)
195 (_, receivedResponse) = sender(query, response=None, useQueue=False)
196 self.assertEqual(receivedResponse, expectedResponse)
197
198
199 def testRecordsCountAllowEmptyNS(self):
200 """
201 RecordsCount: Allow nscount == 0
202
203 Send a query to "allowns.recordscount.tests.powerdns.com.",
204 check that we are getting a valid response.
205 """
206 name = 'allowns.recordscount.tests.powerdns.com.'
207 query = dns.message.make_query(name, 'A', 'IN')
208 response = dns.message.make_response(query)
209 response.answer.append(dns.rrset.from_text(name,
210 3600,
211 dns.rdataclass.IN,
212 dns.rdatatype.A,
213 '127.0.0.1'))
214
215 for method in ("sendUDPQuery", "sendTCPQuery"):
216 sender = getattr(self, method)
217 (receivedQuery, receivedResponse) = sender(query, response)
218 self.assertTrue(receivedQuery)
219 self.assertTrue(receivedResponse)
220 receivedQuery.id = query.id
221 self.assertEqual(query, receivedQuery)
222 self.assertEqual(response, receivedResponse)
223
224 class TestRecordsCountNoOPTInAR(DNSDistTest):
225
226 _config_template = """
227 addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, DNSQType.OPT, 0, 0)), RCodeAction(DNSRCode.REFUSED))
228 newServer{address="127.0.0.1:%s"}
229 """
230
231 def testRecordsCountRefuseOPTinAR(self):
232 """
233 RecordsTypeCount: Refuse OPT in AR
234
235 Send a query to "refuseoptinar.recordscount.tests.powerdns.com.",
236 check that we are getting a REFUSED response.
237 """
238 name = 'refuseoptinar.recordscount.tests.powerdns.com.'
239 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
240 query.flags &= ~dns.flags.RD
241 expectedResponse = dns.message.make_response(query)
242 expectedResponse.set_rcode(dns.rcode.REFUSED)
243
244 for method in ("sendUDPQuery", "sendTCPQuery"):
245 sender = getattr(self, method)
246 (_, receivedResponse) = sender(query, response=None, useQueue=False)
247 self.assertEqual(receivedResponse, expectedResponse)
248
249 def testRecordsCountAllowNoOPTInAR(self):
250 """
251 RecordsTypeCount: Allow no OPT in AR
252
253 Send a query to "allownooptinar.recordscount.tests.powerdns.com.",
254 check that we are getting a valid response.
255 """
256 name = 'allowwnooptinar.recordscount.tests.powerdns.com.'
257 query = dns.message.make_query(name, 'A', 'IN')
258 response = dns.message.make_response(query)
259 response.answer.append(dns.rrset.from_text(name,
260 3600,
261 dns.rdataclass.IN,
262 dns.rdatatype.A,
263 '127.0.0.1'))
264
265 for method in ("sendUDPQuery", "sendTCPQuery"):
266 sender = getattr(self, method)
267 (receivedQuery, receivedResponse) = sender(query, response)
268 self.assertTrue(receivedQuery)
269 self.assertTrue(receivedResponse)
270 receivedQuery.id = query.id
271 self.assertEqual(query, receivedQuery)
272 self.assertEqual(response, receivedResponse)
273
274 def testRecordsCountAllowTwoARButNoOPT(self):
275 """
276 RecordsTypeCount: Allow arcount > 1 without OPT
277
278 Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.",
279 check that we are getting a valid response.
280 """
281 name = 'allowtwoarnoopt.recordscount.tests.powerdns.com.'
282 query = dns.message.make_query(name, 'A', 'IN')
283 query.additional.append(dns.rrset.from_text(name,
284 3600,
285 dns.rdataclass.IN,
286 dns.rdatatype.A,
287 '127.0.0.1'))
288 query.additional.append(dns.rrset.from_text(name,
289 3600,
290 dns.rdataclass.IN,
291 dns.rdatatype.A,
292 '127.0.0.1'))
293
294 response = dns.message.make_response(query)
295 response.answer.append(dns.rrset.from_text(name,
296 3600,
297 dns.rdataclass.IN,
298 dns.rdatatype.A,
299 '127.0.0.1'))
300
301 for method in ("sendUDPQuery", "sendTCPQuery"):
302 sender = getattr(self, method)
303 (receivedQuery, receivedResponse) = sender(query, response)
304 self.assertTrue(receivedQuery)
305 self.assertTrue(receivedResponse)
306 receivedQuery.id = query.id
307 self.assertEqual(query, receivedQuery)
308 self.assertEqual(response, receivedResponse)