]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_RecordsCount.py
Merge pull request #8795 from omoerbeek/rec-lua-docs-policytag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_RecordsCount.py
1 #!/usr/bin/env python
2 import copy
3 import os
4 import dns
5 from dnsdisttests import DNSDistTest
6
7 class TestRecordsCountOnlyOneAR(DNSDistTest):
8
9 _config_template = """
10 addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(DNSRCode.REFUSED))
11 newServer{address="127.0.0.1:%s"}
12 """
13
14 def testRecordsCountRefuseEmptyAR(self):
15 """
16 RecordsCount: Refuse arcount == 0 (No OPT)
17
18 Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
19 check that we are getting a REFUSED response.
20 """
21 name = 'refuseemptyar.recordscount.tests.powerdns.com.'
22 query = dns.message.make_query(name, 'A', 'IN')
23 query.flags &= ~dns.flags.RD
24 expectedResponse = dns.message.make_response(query)
25 expectedResponse.set_rcode(dns.rcode.REFUSED)
26
27 for method in ("sendUDPQuery", "sendTCPQuery"):
28 sender = getattr(self, method)
29 (_, receivedResponse) = sender(query, response=None, useQueue=False)
30 self.assertEquals(receivedResponse, expectedResponse)
31
32 def testRecordsCountAllowOneAR(self):
33 """
34 RecordsCount: Allow arcount == 1 (OPT)
35
36 Send a query to "allowonear.recordscount.tests.powerdns.com.",
37 check that we are getting a valid response.
38 """
39 name = 'allowonear.recordscount.tests.powerdns.com.'
40 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
41 response = dns.message.make_response(query)
42 response.answer.append(dns.rrset.from_text(name,
43 3600,
44 dns.rdataclass.IN,
45 dns.rdatatype.A,
46 '127.0.0.1'))
47
48 for method in ("sendUDPQuery", "sendTCPQuery"):
49 sender = getattr(self, method)
50 (receivedQuery, receivedResponse) = sender(query, response)
51 self.assertTrue(receivedQuery)
52 self.assertTrue(receivedResponse)
53 receivedQuery.id = query.id
54 self.assertEquals(query, receivedQuery)
55 self.assertEquals(response, receivedResponse)
56
57 def testRecordsCountRefuseTwoAR(self):
58 """
59 RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record)
60
61 Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
62 check that we are getting a REFUSED response.
63 """
64 name = 'refusetwoar.recordscount.tests.powerdns.com.'
65 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
66 query.flags &= ~dns.flags.RD
67 query.additional.append(dns.rrset.from_text(name,
68 3600,
69 dns.rdataclass.IN,
70 dns.rdatatype.A,
71 '127.0.0.1'))
72 expectedResponse = dns.message.make_response(query)
73 expectedResponse.set_rcode(dns.rcode.REFUSED)
74
75 for method in ("sendUDPQuery", "sendTCPQuery"):
76 sender = getattr(self, method)
77 (_, receivedResponse) = sender(query, response=None, useQueue=False)
78 self.assertEquals(receivedResponse, expectedResponse)
79
80 class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest):
81
82 _config_template = """
83 addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
84 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
85 newServer{address="127.0.0.1:%s"}
86 """
87
88 def testRecordsCountRefuseOneAN(self):
89 """
90 RecordsCount: Refuse ancount == 0
91
92 Send a query to "refusenoan.recordscount.tests.powerdns.com.",
93 check that we are getting a REFUSED response.
94 """
95 name = 'refusenoan.recordscount.tests.powerdns.com.'
96 query = dns.message.make_query(name, 'A', 'IN')
97 query.flags &= ~dns.flags.RD
98 expectedResponse = dns.message.make_response(query)
99 expectedResponse.set_rcode(dns.rcode.REFUSED)
100
101 for method in ("sendUDPQuery", "sendTCPQuery"):
102 sender = getattr(self, method)
103 (_, receivedResponse) = sender(query, response=None, useQueue=False)
104 self.assertEquals(receivedResponse, expectedResponse)
105
106 def testRecordsCountAllowTwoAN(self):
107 """
108 RecordsCount: Allow ancount == 2
109
110 Send a query to "allowtwoan.recordscount.tests.powerdns.com.",
111 check that we are getting a valid response.
112 """
113 name = 'allowtwoan.recordscount.tests.powerdns.com.'
114 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
115 rrset = dns.rrset.from_text_list(name,
116 3600,
117 dns.rdataclass.IN,
118 dns.rdatatype.A,
119 ['127.0.0.1', '127.0.0.2'])
120 query.answer.append(rrset)
121 response = dns.message.make_response(query)
122 response.answer.append(rrset)
123
124 for method in ("sendUDPQuery", "sendTCPQuery"):
125 sender = getattr(self, method)
126 (receivedQuery, receivedResponse) = sender(query, response)
127 self.assertTrue(receivedQuery)
128 self.assertTrue(receivedResponse)
129 receivedQuery.id = query.id
130 self.assertEquals(query, receivedQuery)
131 self.assertEquals(response, receivedResponse)
132
133 def testRecordsCountRefuseFourAN(self):
134 """
135 RecordsCount: Refuse ancount > 3
136
137 Send a query to "refusefouran.recordscount.tests.powerdns.com.",
138 check that we are getting a REFUSED response.
139 """
140 name = 'refusefouran.recordscount.tests.powerdns.com.'
141 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
142 query.flags &= ~dns.flags.RD
143 rrset = dns.rrset.from_text_list(name,
144 3600,
145 dns.rdataclass.IN,
146 dns.rdatatype.A,
147 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4'])
148 query.answer.append(rrset)
149
150 expectedResponse = dns.message.make_response(query)
151 expectedResponse.set_rcode(dns.rcode.REFUSED)
152 expectedResponse.answer.append(rrset)
153
154 for method in ("sendUDPQuery", "sendTCPQuery"):
155 sender = getattr(self, method)
156 (_, receivedResponse) = sender(query, response=None, useQueue=False)
157 self.assertEquals(receivedResponse, expectedResponse)
158
159 class TestRecordsCountNothingInNS(DNSDistTest):
160
161 _config_template = """
162 addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
163 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
164 newServer{address="127.0.0.1:%s"}
165 """
166
167 def testRecordsCountRefuseNS(self):
168 """
169 RecordsCount: Refuse nscount != 0
170
171 Send a query to "refusens.recordscount.tests.powerdns.com.",
172 check that we are getting a REFUSED response.
173 """
174 name = 'refusens.recordscount.tests.powerdns.com.'
175 query = dns.message.make_query(name, 'A', 'IN')
176 rrset = dns.rrset.from_text(name,
177 3600,
178 dns.rdataclass.IN,
179 dns.rdatatype.NS,
180 'ns.tests.powerdns.com.')
181 query.authority.append(rrset)
182 query.flags &= ~dns.flags.RD
183 expectedResponse = dns.message.make_response(query)
184 expectedResponse.set_rcode(dns.rcode.REFUSED)
185 expectedResponse.authority.append(rrset)
186
187 for method in ("sendUDPQuery", "sendTCPQuery"):
188 sender = getattr(self, method)
189 (_, receivedResponse) = sender(query, response=None, useQueue=False)
190 self.assertEquals(receivedResponse, expectedResponse)
191
192
193 def testRecordsCountAllowEmptyNS(self):
194 """
195 RecordsCount: Allow nscount == 0
196
197 Send a query to "allowns.recordscount.tests.powerdns.com.",
198 check that we are getting a valid response.
199 """
200 name = 'allowns.recordscount.tests.powerdns.com.'
201 query = dns.message.make_query(name, 'A', 'IN')
202 response = dns.message.make_response(query)
203 response.answer.append(dns.rrset.from_text(name,
204 3600,
205 dns.rdataclass.IN,
206 dns.rdatatype.A,
207 '127.0.0.1'))
208
209 for method in ("sendUDPQuery", "sendTCPQuery"):
210 sender = getattr(self, method)
211 (receivedQuery, receivedResponse) = sender(query, response)
212 self.assertTrue(receivedQuery)
213 self.assertTrue(receivedResponse)
214 receivedQuery.id = query.id
215 self.assertEquals(query, receivedQuery)
216 self.assertEquals(response, receivedResponse)
217
218 class TestRecordsCountNoOPTInAR(DNSDistTest):
219
220 _config_template = """
221 addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, DNSQType.OPT, 0, 0)), RCodeAction(DNSRCode.REFUSED))
222 newServer{address="127.0.0.1:%s"}
223 """
224
225 def testRecordsCountRefuseOPTinAR(self):
226 """
227 RecordsTypeCount: Refuse OPT in AR
228
229 Send a query to "refuseoptinar.recordscount.tests.powerdns.com.",
230 check that we are getting a REFUSED response.
231 """
232 name = 'refuseoptinar.recordscount.tests.powerdns.com.'
233 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
234 query.flags &= ~dns.flags.RD
235 expectedResponse = dns.message.make_response(query)
236 expectedResponse.set_rcode(dns.rcode.REFUSED)
237
238 for method in ("sendUDPQuery", "sendTCPQuery"):
239 sender = getattr(self, method)
240 (_, receivedResponse) = sender(query, response=None, useQueue=False)
241 self.assertEquals(receivedResponse, expectedResponse)
242
243 def testRecordsCountAllowNoOPTInAR(self):
244 """
245 RecordsTypeCount: Allow no OPT in AR
246
247 Send a query to "allownooptinar.recordscount.tests.powerdns.com.",
248 check that we are getting a valid response.
249 """
250 name = 'allowwnooptinar.recordscount.tests.powerdns.com.'
251 query = dns.message.make_query(name, 'A', 'IN')
252 response = dns.message.make_response(query)
253 response.answer.append(dns.rrset.from_text(name,
254 3600,
255 dns.rdataclass.IN,
256 dns.rdatatype.A,
257 '127.0.0.1'))
258
259 for method in ("sendUDPQuery", "sendTCPQuery"):
260 sender = getattr(self, method)
261 (receivedQuery, receivedResponse) = sender(query, response)
262 self.assertTrue(receivedQuery)
263 self.assertTrue(receivedResponse)
264 receivedQuery.id = query.id
265 self.assertEquals(query, receivedQuery)
266 self.assertEquals(response, receivedResponse)
267
268 def testRecordsCountAllowTwoARButNoOPT(self):
269 """
270 RecordsTypeCount: Allow arcount > 1 without OPT
271
272 Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.",
273 check that we are getting a valid response.
274 """
275 name = 'allowtwoarnoopt.recordscount.tests.powerdns.com.'
276 query = dns.message.make_query(name, 'A', 'IN')
277 query.additional.append(dns.rrset.from_text(name,
278 3600,
279 dns.rdataclass.IN,
280 dns.rdatatype.A,
281 '127.0.0.1'))
282 query.additional.append(dns.rrset.from_text(name,
283 3600,
284 dns.rdataclass.IN,
285 dns.rdatatype.A,
286 '127.0.0.1'))
287
288 response = dns.message.make_response(query)
289 response.answer.append(dns.rrset.from_text(name,
290 3600,
291 dns.rdataclass.IN,
292 dns.rdatatype.A,
293 '127.0.0.1'))
294
295 for method in ("sendUDPQuery", "sendTCPQuery"):
296 sender = getattr(self, method)
297 (receivedQuery, receivedResponse) = sender(query, response)
298 self.assertTrue(receivedQuery)
299 self.assertTrue(receivedResponse)
300 receivedQuery.id = query.id
301 self.assertEquals(query, receivedQuery)
302 self.assertEquals(response, receivedResponse)