]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_RecordsCount.py
5 from dnsdisttests
import DNSDistTest
7 class TestRecordsCountOnlyOneAR(DNSDistTest
):
10 addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(dnsdist.REFUSED))
11 newServer{address="127.0.0.1:%s"}
14 def testRecordsCountRefuseEmptyAR(self
):
16 RecordsCount: Refuse arcount == 0 (No OPT)
18 Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
19 check that we are getting a REFUSED response.
21 name
= 'refuseemptyar.recordscount.tests.powerdns.com.'
22 query
= dns
.message
.make_query(name
, 'A', 'IN')
23 expectedResponse
= dns
.message
.make_response(query
)
24 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
26 for method
in ("sendUDPQuery", "sendTCPQuery"):
27 sender
= getattr(self
, method
)
28 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
29 self
.assertEquals(receivedResponse
, expectedResponse
)
31 def testRecordsCountAllowOneAR(self
):
33 RecordsCount: Allow arcount == 1 (OPT)
35 Send a query to "allowonear.recordscount.tests.powerdns.com.",
36 check that we are getting a valid response.
38 name
= 'allowonear.recordscount.tests.powerdns.com.'
39 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
40 response
= dns
.message
.make_response(query
)
41 response
.answer
.append(dns
.rrset
.from_text(name
,
47 for method
in ("sendUDPQuery", "sendTCPQuery"):
48 sender
= getattr(self
, method
)
49 (receivedQuery
, receivedResponse
) = sender(query
, response
)
50 self
.assertTrue(receivedQuery
)
51 self
.assertTrue(receivedResponse
)
52 receivedQuery
.id = query
.id
53 self
.assertEquals(query
, receivedQuery
)
54 self
.assertEquals(response
, receivedResponse
)
56 def testRecordsCountRefuseTwoAR(self
):
58 RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record)
60 Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
61 check that we are getting a REFUSED response.
63 name
= 'refusetwoar.recordscount.tests.powerdns.com.'
64 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
65 query
.additional
.append(dns
.rrset
.from_text(name
,
70 expectedResponse
= dns
.message
.make_response(query
)
71 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
73 for method
in ("sendUDPQuery", "sendTCPQuery"):
74 sender
= getattr(self
, method
)
75 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
76 self
.assertEquals(receivedResponse
, expectedResponse
)
78 class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest
):
80 _config_template
= """
81 addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
82 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
83 newServer{address="127.0.0.1:%s"}
86 def testRecordsCountRefuseOneAN(self
):
88 RecordsCount: Refuse ancount == 0
90 Send a query to "refusenoan.recordscount.tests.powerdns.com.",
91 check that we are getting a REFUSED response.
93 name
= 'refusenoan.recordscount.tests.powerdns.com.'
94 query
= dns
.message
.make_query(name
, 'A', 'IN')
95 expectedResponse
= dns
.message
.make_response(query
)
96 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
98 for method
in ("sendUDPQuery", "sendTCPQuery"):
99 sender
= getattr(self
, method
)
100 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
101 self
.assertEquals(receivedResponse
, expectedResponse
)
103 def testRecordsCountAllowTwoAN(self
):
105 RecordsCount: Allow ancount == 2
107 Send a query to "allowtwoan.recordscount.tests.powerdns.com.",
108 check that we are getting a valid response.
110 name
= 'allowtwoan.recordscount.tests.powerdns.com.'
111 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
112 rrset
= dns
.rrset
.from_text_list(name
,
116 ['127.0.0.1', '127.0.0.2'])
117 query
.answer
.append(rrset
)
118 response
= dns
.message
.make_response(query
)
119 response
.answer
.append(rrset
)
121 for method
in ("sendUDPQuery", "sendTCPQuery"):
122 sender
= getattr(self
, method
)
123 (receivedQuery
, receivedResponse
) = sender(query
, response
)
124 self
.assertTrue(receivedQuery
)
125 self
.assertTrue(receivedResponse
)
126 receivedQuery
.id = query
.id
127 self
.assertEquals(query
, receivedQuery
)
128 self
.assertEquals(response
, receivedResponse
)
130 def testRecordsCountRefuseFourAN(self
):
132 RecordsCount: Refuse ancount > 3
134 Send a query to "refusefouran.recordscount.tests.powerdns.com.",
135 check that we are getting a REFUSED response.
137 name
= 'refusefouran.recordscount.tests.powerdns.com.'
138 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
139 rrset
= dns
.rrset
.from_text_list(name
,
143 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4'])
144 query
.answer
.append(rrset
)
146 expectedResponse
= dns
.message
.make_response(query
)
147 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
148 expectedResponse
.answer
.append(rrset
)
150 for method
in ("sendUDPQuery", "sendTCPQuery"):
151 sender
= getattr(self
, method
)
152 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
153 self
.assertEquals(receivedResponse
, expectedResponse
)
155 class TestRecordsCountNothingInNS(DNSDistTest
):
157 _config_template
= """
158 addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
159 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
160 newServer{address="127.0.0.1:%s"}
163 def testRecordsCountRefuseNS(self
):
165 RecordsCount: Refuse nscount != 0
167 Send a query to "refusens.recordscount.tests.powerdns.com.",
168 check that we are getting a REFUSED response.
170 name
= 'refusens.recordscount.tests.powerdns.com.'
171 query
= dns
.message
.make_query(name
, 'A', 'IN')
172 rrset
= dns
.rrset
.from_text(name
,
176 'ns.tests.powerdns.com.')
177 query
.authority
.append(rrset
)
178 expectedResponse
= dns
.message
.make_response(query
)
179 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
180 expectedResponse
.authority
.append(rrset
)
182 for method
in ("sendUDPQuery", "sendTCPQuery"):
183 sender
= getattr(self
, method
)
184 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
185 self
.assertEquals(receivedResponse
, expectedResponse
)
188 def testRecordsCountAllowEmptyNS(self
):
190 RecordsCount: Allow nscount == 0
192 Send a query to "allowns.recordscount.tests.powerdns.com.",
193 check that we are getting a valid response.
195 name
= 'allowns.recordscount.tests.powerdns.com.'
196 query
= dns
.message
.make_query(name
, 'A', 'IN')
197 response
= dns
.message
.make_response(query
)
198 response
.answer
.append(dns
.rrset
.from_text(name
,
204 for method
in ("sendUDPQuery", "sendTCPQuery"):
205 sender
= getattr(self
, method
)
206 (receivedQuery
, receivedResponse
) = sender(query
, response
)
207 self
.assertTrue(receivedQuery
)
208 self
.assertTrue(receivedResponse
)
209 receivedQuery
.id = query
.id
210 self
.assertEquals(query
, receivedQuery
)
211 self
.assertEquals(response
, receivedResponse
)
213 class TestRecordsCountNoOPTInAR(DNSDistTest
):
215 _config_template
= """
216 addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, dnsdist.OPT, 0, 0)), RCodeAction(dnsdist.REFUSED))
217 newServer{address="127.0.0.1:%s"}
220 def testRecordsCountRefuseOPTinAR(self
):
222 RecordsTypeCount: Refuse OPT in AR
224 Send a query to "refuseoptinar.recordscount.tests.powerdns.com.",
225 check that we are getting a REFUSED response.
227 name
= 'refuseoptinar.recordscount.tests.powerdns.com.'
228 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
229 expectedResponse
= dns
.message
.make_response(query
)
230 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
232 for method
in ("sendUDPQuery", "sendTCPQuery"):
233 sender
= getattr(self
, method
)
234 (_
, receivedResponse
) = sender(query
, response
=None, useQueue
=False)
235 self
.assertEquals(receivedResponse
, expectedResponse
)
237 def testRecordsCountAllowNoOPTInAR(self
):
239 RecordsTypeCount: Allow no OPT in AR
241 Send a query to "allownooptinar.recordscount.tests.powerdns.com.",
242 check that we are getting a valid response.
244 name
= 'allowwnooptinar.recordscount.tests.powerdns.com.'
245 query
= dns
.message
.make_query(name
, 'A', 'IN')
246 response
= dns
.message
.make_response(query
)
247 response
.answer
.append(dns
.rrset
.from_text(name
,
253 for method
in ("sendUDPQuery", "sendTCPQuery"):
254 sender
= getattr(self
, method
)
255 (receivedQuery
, receivedResponse
) = sender(query
, response
)
256 self
.assertTrue(receivedQuery
)
257 self
.assertTrue(receivedResponse
)
258 receivedQuery
.id = query
.id
259 self
.assertEquals(query
, receivedQuery
)
260 self
.assertEquals(response
, receivedResponse
)
262 def testRecordsCountAllowTwoARButNoOPT(self
):
264 RecordsTypeCount: Allow arcount > 1 without OPT
266 Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.",
267 check that we are getting a valid response.
269 name
= 'allowtwoarnoopt.recordscount.tests.powerdns.com.'
270 query
= dns
.message
.make_query(name
, 'A', 'IN')
271 query
.additional
.append(dns
.rrset
.from_text(name
,
276 query
.additional
.append(dns
.rrset
.from_text(name
,
282 response
= dns
.message
.make_response(query
)
283 response
.answer
.append(dns
.rrset
.from_text(name
,
289 for method
in ("sendUDPQuery", "sendTCPQuery"):
290 sender
= getattr(self
, method
)
291 (receivedQuery
, receivedResponse
) = sender(query
, response
)
292 self
.assertTrue(receivedQuery
)
293 self
.assertTrue(receivedResponse
)
294 receivedQuery
.id = query
.id
295 self
.assertEquals(query
, receivedQuery
)
296 self
.assertEquals(response
, receivedResponse
)