]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_RecordsCount.py
5 from dnsdisttests
import DNSDistTest
7 class TestRecordsCountOnlyOneAR(DNSDistTest
):
10 addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(DNSRCode.REFUSED))
11 newServer{address="127.0.0.1:%s"}
14 def testRecordsCountRefuseEmptyAR(self
):
16 RecordsCount: Refuse arcount == 0 (No OPT)
18 Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
19 check that we are getting a REFUSED response.
21 name
= 'refuseemptyar.recordscount.tests.powerdns.com.'
22 query
= dns
.message
.make_query(name
, 'A', 'IN')
23 query
.flags
&= ~dns
.flags
.RD
24 expectedResponse
= dns
.message
.make_response(query
)
25 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
27 for method
in ("sendUDPQuery", "sendTCPQuery"):
28 sender
= getattr(self
, method
)
29 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
30 self
.assertEquals(receivedResponse
, expectedResponse
)
32 def testRecordsCountAllowOneAR(self
):
34 RecordsCount: Allow arcount == 1 (OPT)
36 Send a query to "allowonear.recordscount.tests.powerdns.com.",
37 check that we are getting a valid response.
39 name
= 'allowonear.recordscount.tests.powerdns.com.'
40 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
41 response
= dns
.message
.make_response(query
)
42 response
.answer
.append(dns
.rrset
.from_text(name
,
48 for method
in ("sendUDPQuery", "sendTCPQuery"):
49 sender
= getattr(self
, method
)
50 (receivedQuery
, receivedResponse
) = sender(query
, response
)
51 self
.assertTrue(receivedQuery
)
52 self
.assertTrue(receivedResponse
)
53 receivedQuery
.id = query
.id
54 self
.assertEquals(query
, receivedQuery
)
55 self
.assertEquals(response
, receivedResponse
)
57 def testRecordsCountRefuseTwoAR(self
):
59 RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record)
61 Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
62 check that we are getting a REFUSED response.
64 name
= 'refusetwoar.recordscount.tests.powerdns.com.'
65 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
66 query
.flags
&= ~dns
.flags
.RD
67 query
.additional
.append(dns
.rrset
.from_text(name
,
72 expectedResponse
= dns
.message
.make_response(query
)
73 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
75 for method
in ("sendUDPQuery", "sendTCPQuery"):
76 sender
= getattr(self
, method
)
77 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
78 self
.assertEquals(receivedResponse
, expectedResponse
)
80 class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest
):
82 _config_template
= """
83 addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
84 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
85 newServer{address="127.0.0.1:%s"}
88 def testRecordsCountRefuseOneAN(self
):
90 RecordsCount: Refuse ancount == 0
92 Send a query to "refusenoan.recordscount.tests.powerdns.com.",
93 check that we are getting a REFUSED response.
95 name
= 'refusenoan.recordscount.tests.powerdns.com.'
96 query
= dns
.message
.make_query(name
, 'A', 'IN')
97 query
.flags
&= ~dns
.flags
.RD
98 expectedResponse
= dns
.message
.make_response(query
)
99 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
101 for method
in ("sendUDPQuery", "sendTCPQuery"):
102 sender
= getattr(self
, method
)
103 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
104 self
.assertEquals(receivedResponse
, expectedResponse
)
106 def testRecordsCountAllowTwoAN(self
):
108 RecordsCount: Allow ancount == 2
110 Send a query to "allowtwoan.recordscount.tests.powerdns.com.",
111 check that we are getting a valid response.
113 name
= 'allowtwoan.recordscount.tests.powerdns.com.'
114 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
115 rrset
= dns
.rrset
.from_text_list(name
,
119 ['127.0.0.1', '127.0.0.2'])
120 query
.answer
.append(rrset
)
121 response
= dns
.message
.make_response(query
)
122 response
.answer
.append(rrset
)
124 for method
in ("sendUDPQuery", "sendTCPQuery"):
125 sender
= getattr(self
, method
)
126 (receivedQuery
, receivedResponse
) = sender(query
, response
)
127 self
.assertTrue(receivedQuery
)
128 self
.assertTrue(receivedResponse
)
129 receivedQuery
.id = query
.id
130 self
.assertEquals(query
, receivedQuery
)
131 self
.assertEquals(response
, receivedResponse
)
133 def testRecordsCountRefuseFourAN(self
):
135 RecordsCount: Refuse ancount > 3
137 Send a query to "refusefouran.recordscount.tests.powerdns.com.",
138 check that we are getting a REFUSED response.
140 name
= 'refusefouran.recordscount.tests.powerdns.com.'
141 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
142 query
.flags
&= ~dns
.flags
.RD
143 rrset
= dns
.rrset
.from_text_list(name
,
147 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4'])
148 query
.answer
.append(rrset
)
150 expectedResponse
= dns
.message
.make_response(query
)
151 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
152 expectedResponse
.answer
.append(rrset
)
154 for method
in ("sendUDPQuery", "sendTCPQuery"):
155 sender
= getattr(self
, method
)
156 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
157 self
.assertEquals(receivedResponse
, expectedResponse
)
159 class TestRecordsCountNothingInNS(DNSDistTest
):
161 _config_template
= """
162 addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
163 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
164 newServer{address="127.0.0.1:%s"}
167 def testRecordsCountRefuseNS(self
):
169 RecordsCount: Refuse nscount != 0
171 Send a query to "refusens.recordscount.tests.powerdns.com.",
172 check that we are getting a REFUSED response.
174 name
= 'refusens.recordscount.tests.powerdns.com.'
175 query
= dns
.message
.make_query(name
, 'A', 'IN')
176 rrset
= dns
.rrset
.from_text(name
,
180 'ns.tests.powerdns.com.')
181 query
.authority
.append(rrset
)
182 query
.flags
&= ~dns
.flags
.RD
183 expectedResponse
= dns
.message
.make_response(query
)
184 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
185 expectedResponse
.authority
.append(rrset
)
187 for method
in ("sendUDPQuery", "sendTCPQuery"):
188 sender
= getattr(self
, method
)
189 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
190 self
.assertEquals(receivedResponse
, expectedResponse
)
193 def testRecordsCountAllowEmptyNS(self
):
195 RecordsCount: Allow nscount == 0
197 Send a query to "allowns.recordscount.tests.powerdns.com.",
198 check that we are getting a valid response.
200 name
= 'allowns.recordscount.tests.powerdns.com.'
201 query
= dns
.message
.make_query(name
, 'A', 'IN')
202 response
= dns
.message
.make_response(query
)
203 response
.answer
.append(dns
.rrset
.from_text(name
,
209 for method
in ("sendUDPQuery", "sendTCPQuery"):
210 sender
= getattr(self
, method
)
211 (receivedQuery
, receivedResponse
) = sender(query
, response
)
212 self
.assertTrue(receivedQuery
)
213 self
.assertTrue(receivedResponse
)
214 receivedQuery
.id = query
.id
215 self
.assertEquals(query
, receivedQuery
)
216 self
.assertEquals(response
, receivedResponse
)
218 class TestRecordsCountNoOPTInAR(DNSDistTest
):
220 _config_template
= """
221 addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, DNSQType.OPT, 0, 0)), RCodeAction(DNSRCode.REFUSED))
222 newServer{address="127.0.0.1:%s"}
225 def testRecordsCountRefuseOPTinAR(self
):
227 RecordsTypeCount: Refuse OPT in AR
229 Send a query to "refuseoptinar.recordscount.tests.powerdns.com.",
230 check that we are getting a REFUSED response.
232 name
= 'refuseoptinar.recordscount.tests.powerdns.com.'
233 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
234 query
.flags
&= ~dns
.flags
.RD
235 expectedResponse
= dns
.message
.make_response(query
)
236 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
238 for method
in ("sendUDPQuery", "sendTCPQuery"):
239 sender
= getattr(self
, method
)
240 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
241 self
.assertEquals(receivedResponse
, expectedResponse
)
243 def testRecordsCountAllowNoOPTInAR(self
):
245 RecordsTypeCount: Allow no OPT in AR
247 Send a query to "allownooptinar.recordscount.tests.powerdns.com.",
248 check that we are getting a valid response.
250 name
= 'allowwnooptinar.recordscount.tests.powerdns.com.'
251 query
= dns
.message
.make_query(name
, 'A', 'IN')
252 response
= dns
.message
.make_response(query
)
253 response
.answer
.append(dns
.rrset
.from_text(name
,
259 for method
in ("sendUDPQuery", "sendTCPQuery"):
260 sender
= getattr(self
, method
)
261 (receivedQuery
, receivedResponse
) = sender(query
, response
)
262 self
.assertTrue(receivedQuery
)
263 self
.assertTrue(receivedResponse
)
264 receivedQuery
.id = query
.id
265 self
.assertEquals(query
, receivedQuery
)
266 self
.assertEquals(response
, receivedResponse
)
268 def testRecordsCountAllowTwoARButNoOPT(self
):
270 RecordsTypeCount: Allow arcount > 1 without OPT
272 Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.",
273 check that we are getting a valid response.
275 name
= 'allowtwoarnoopt.recordscount.tests.powerdns.com.'
276 query
= dns
.message
.make_query(name
, 'A', 'IN')
277 query
.additional
.append(dns
.rrset
.from_text(name
,
282 query
.additional
.append(dns
.rrset
.from_text(name
,
288 response
= dns
.message
.make_response(query
)
289 response
.answer
.append(dns
.rrset
.from_text(name
,
295 for method
in ("sendUDPQuery", "sendTCPQuery"):
296 sender
= getattr(self
, method
)
297 (receivedQuery
, receivedResponse
) = sender(query
, response
)
298 self
.assertTrue(receivedQuery
)
299 self
.assertTrue(receivedResponse
)
300 receivedQuery
.id = query
.id
301 self
.assertEquals(query
, receivedQuery
)
302 self
.assertEquals(response
, receivedResponse
)