]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_RecordsCount.py
5 from dnsdisttests
import DNSDistTest
7 class TestRecordsCountOnlyOneAR(DNSDistTest
):
10 addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(DNSRCode.REFUSED))
11 newServer{address="127.0.0.1:%s"}
14 def testRecordsCountRefuseEmptyAR(self
):
16 RecordsCount: Refuse arcount == 0 (No OPT)
18 Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
19 check that we are getting a REFUSED response.
21 name
= 'refuseemptyar.recordscount.tests.powerdns.com.'
22 query
= dns
.message
.make_query(name
, 'A', 'IN')
23 query
.flags
&= ~dns
.flags
.RD
24 expectedResponse
= dns
.message
.make_response(query
)
25 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
27 for method
in ("sendUDPQuery", "sendTCPQuery"):
28 sender
= getattr(self
, method
)
29 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
30 self
.assertEqual(receivedResponse
, expectedResponse
)
32 def testRecordsCountAllowOneAR(self
):
34 RecordsCount: Allow arcount == 1 (OPT)
36 Send a query to "allowonear.recordscount.tests.powerdns.com.",
37 check that we are getting a valid response.
39 name
= 'allowonear.recordscount.tests.powerdns.com.'
40 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
41 response
= dns
.message
.make_response(query
)
42 response
.answer
.append(dns
.rrset
.from_text(name
,
48 for method
in ("sendUDPQuery", "sendTCPQuery"):
49 sender
= getattr(self
, method
)
50 (receivedQuery
, receivedResponse
) = sender(query
, response
)
51 self
.assertTrue(receivedQuery
)
52 self
.assertTrue(receivedResponse
)
53 receivedQuery
.id = query
.id
54 self
.assertEqual(query
, receivedQuery
)
55 self
.assertEqual(response
, receivedResponse
)
57 def testRecordsCountRefuseTwoAR(self
):
59 RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record)
61 Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
62 check that we are getting a REFUSED response.
64 name
= 'refusetwoar.recordscount.tests.powerdns.com.'
65 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
66 query
.flags
&= ~dns
.flags
.RD
67 query
.additional
.append(dns
.rrset
.from_text(name
,
72 expectedResponse
= dns
.message
.make_response(query
)
73 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
74 # this is not great, we should fix that!
75 expectedResponse
.additional
.append(dns
.rrset
.from_text(name
,
81 for method
in ("sendUDPQuery", "sendTCPQuery"):
82 sender
= getattr(self
, method
)
83 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
84 self
.assertEqual(receivedResponse
, expectedResponse
)
86 class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest
):
88 _config_template
= """
89 addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
90 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
91 newServer{address="127.0.0.1:%s"}
94 def testRecordsCountRefuseOneAN(self
):
96 RecordsCount: Refuse ancount == 0
98 Send a query to "refusenoan.recordscount.tests.powerdns.com.",
99 check that we are getting a REFUSED response.
101 name
= 'refusenoan.recordscount.tests.powerdns.com.'
102 query
= dns
.message
.make_query(name
, 'A', 'IN')
103 query
.flags
&= ~dns
.flags
.RD
104 expectedResponse
= dns
.message
.make_response(query
)
105 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
107 for method
in ("sendUDPQuery", "sendTCPQuery"):
108 sender
= getattr(self
, method
)
109 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
110 self
.assertEqual(receivedResponse
, expectedResponse
)
112 def testRecordsCountAllowTwoAN(self
):
114 RecordsCount: Allow ancount == 2
116 Send a query to "allowtwoan.recordscount.tests.powerdns.com.",
117 check that we are getting a valid response.
119 name
= 'allowtwoan.recordscount.tests.powerdns.com.'
120 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
121 rrset
= dns
.rrset
.from_text_list(name
,
125 ['127.0.0.1', '127.0.0.2'])
126 query
.answer
.append(rrset
)
127 response
= dns
.message
.make_response(query
)
128 response
.answer
.append(rrset
)
130 for method
in ("sendUDPQuery", "sendTCPQuery"):
131 sender
= getattr(self
, method
)
132 (receivedQuery
, receivedResponse
) = sender(query
, response
)
133 self
.assertTrue(receivedQuery
)
134 self
.assertTrue(receivedResponse
)
135 receivedQuery
.id = query
.id
136 self
.assertEqual(query
, receivedQuery
)
137 self
.assertEqual(response
, receivedResponse
)
139 def testRecordsCountRefuseFourAN(self
):
141 RecordsCount: Refuse ancount > 3
143 Send a query to "refusefouran.recordscount.tests.powerdns.com.",
144 check that we are getting a REFUSED response.
146 name
= 'refusefouran.recordscount.tests.powerdns.com.'
147 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
148 query
.flags
&= ~dns
.flags
.RD
149 rrset
= dns
.rrset
.from_text_list(name
,
153 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4'])
154 query
.answer
.append(rrset
)
156 expectedResponse
= dns
.message
.make_response(query
)
157 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
158 expectedResponse
.answer
.append(rrset
)
160 for method
in ("sendUDPQuery", "sendTCPQuery"):
161 sender
= getattr(self
, method
)
162 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
163 self
.assertEqual(receivedResponse
, expectedResponse
)
165 class TestRecordsCountNothingInNS(DNSDistTest
):
167 _config_template
= """
168 addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
169 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
170 newServer{address="127.0.0.1:%s"}
173 def testRecordsCountRefuseNS(self
):
175 RecordsCount: Refuse nscount != 0
177 Send a query to "refusens.recordscount.tests.powerdns.com.",
178 check that we are getting a REFUSED response.
180 name
= 'refusens.recordscount.tests.powerdns.com.'
181 query
= dns
.message
.make_query(name
, 'A', 'IN')
182 rrset
= dns
.rrset
.from_text(name
,
186 'ns.tests.powerdns.com.')
187 query
.authority
.append(rrset
)
188 query
.flags
&= ~dns
.flags
.RD
189 expectedResponse
= dns
.message
.make_response(query
)
190 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
191 expectedResponse
.authority
.append(rrset
)
193 for method
in ("sendUDPQuery", "sendTCPQuery"):
194 sender
= getattr(self
, method
)
195 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
196 self
.assertEqual(receivedResponse
, expectedResponse
)
199 def testRecordsCountAllowEmptyNS(self
):
201 RecordsCount: Allow nscount == 0
203 Send a query to "allowns.recordscount.tests.powerdns.com.",
204 check that we are getting a valid response.
206 name
= 'allowns.recordscount.tests.powerdns.com.'
207 query
= dns
.message
.make_query(name
, 'A', 'IN')
208 response
= dns
.message
.make_response(query
)
209 response
.answer
.append(dns
.rrset
.from_text(name
,
215 for method
in ("sendUDPQuery", "sendTCPQuery"):
216 sender
= getattr(self
, method
)
217 (receivedQuery
, receivedResponse
) = sender(query
, response
)
218 self
.assertTrue(receivedQuery
)
219 self
.assertTrue(receivedResponse
)
220 receivedQuery
.id = query
.id
221 self
.assertEqual(query
, receivedQuery
)
222 self
.assertEqual(response
, receivedResponse
)
224 class TestRecordsCountNoOPTInAR(DNSDistTest
):
226 _config_template
= """
227 addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, DNSQType.OPT, 0, 0)), RCodeAction(DNSRCode.REFUSED))
228 newServer{address="127.0.0.1:%s"}
231 def testRecordsCountRefuseOPTinAR(self
):
233 RecordsTypeCount: Refuse OPT in AR
235 Send a query to "refuseoptinar.recordscount.tests.powerdns.com.",
236 check that we are getting a REFUSED response.
238 name
= 'refuseoptinar.recordscount.tests.powerdns.com.'
239 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
240 query
.flags
&= ~dns
.flags
.RD
241 expectedResponse
= dns
.message
.make_response(query
)
242 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
244 for method
in ("sendUDPQuery", "sendTCPQuery"):
245 sender
= getattr(self
, method
)
246 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
247 self
.assertEqual(receivedResponse
, expectedResponse
)
249 def testRecordsCountAllowNoOPTInAR(self
):
251 RecordsTypeCount: Allow no OPT in AR
253 Send a query to "allownooptinar.recordscount.tests.powerdns.com.",
254 check that we are getting a valid response.
256 name
= 'allowwnooptinar.recordscount.tests.powerdns.com.'
257 query
= dns
.message
.make_query(name
, 'A', 'IN')
258 response
= dns
.message
.make_response(query
)
259 response
.answer
.append(dns
.rrset
.from_text(name
,
265 for method
in ("sendUDPQuery", "sendTCPQuery"):
266 sender
= getattr(self
, method
)
267 (receivedQuery
, receivedResponse
) = sender(query
, response
)
268 self
.assertTrue(receivedQuery
)
269 self
.assertTrue(receivedResponse
)
270 receivedQuery
.id = query
.id
271 self
.assertEqual(query
, receivedQuery
)
272 self
.assertEqual(response
, receivedResponse
)
274 def testRecordsCountAllowTwoARButNoOPT(self
):
276 RecordsTypeCount: Allow arcount > 1 without OPT
278 Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.",
279 check that we are getting a valid response.
281 name
= 'allowtwoarnoopt.recordscount.tests.powerdns.com.'
282 query
= dns
.message
.make_query(name
, 'A', 'IN')
283 query
.additional
.append(dns
.rrset
.from_text(name
,
288 query
.additional
.append(dns
.rrset
.from_text(name
,
294 response
= dns
.message
.make_response(query
)
295 response
.answer
.append(dns
.rrset
.from_text(name
,
301 for method
in ("sendUDPQuery", "sendTCPQuery"):
302 sender
= getattr(self
, method
)
303 (receivedQuery
, receivedResponse
) = sender(query
, response
)
304 self
.assertTrue(receivedQuery
)
305 self
.assertTrue(receivedResponse
)
306 receivedQuery
.id = query
.id
307 self
.assertEqual(query
, receivedQuery
)
308 self
.assertEqual(response
, receivedResponse
)