]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_RecordsCount.py
5 from dnsdisttests
import DNSDistTest
7 class TestRecordsCountOnlyOneAR(DNSDistTest
):
10 addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(dnsdist.REFUSED))
11 newServer{address="127.0.0.1:%s"}
14 def testRecordsCountRefuseEmptyAR(self
):
16 RecordsCount: Refuse arcount == 0
18 Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
19 check that we are getting a REFUSED response.
21 name
= 'refuseemptyar.recordscount.tests.powerdns.com.'
22 query
= dns
.message
.make_query(name
, 'A', 'IN')
23 expectedResponse
= dns
.message
.make_response(query
)
24 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
26 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
27 self
.assertEquals(receivedResponse
, expectedResponse
)
29 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
30 self
.assertEquals(receivedResponse
, expectedResponse
)
32 def testRecordsCountAllowOneAR(self
):
34 RecordsCount: Allow arcount == 1
36 Send a query to "allowonear.recordscount.tests.powerdns.com.",
37 check that we are getting a valid response.
39 name
= 'allowonear.recordscount.tests.powerdns.com.'
40 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
41 response
= dns
.message
.make_response(query
)
42 response
.answer
.append(dns
.rrset
.from_text(name
,
48 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
49 self
.assertTrue(receivedQuery
)
50 self
.assertTrue(receivedResponse
)
51 receivedQuery
.id = query
.id
52 self
.assertEquals(query
, receivedQuery
)
53 self
.assertEquals(response
, receivedResponse
)
55 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
56 self
.assertTrue(receivedQuery
)
57 self
.assertTrue(receivedResponse
)
58 receivedQuery
.id = query
.id
59 self
.assertEquals(query
, receivedQuery
)
60 self
.assertEquals(response
, receivedResponse
)
62 def testRecordsCountRefuseTwoAR(self
):
64 RecordsCount: Refuse arcount > 1
66 Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
67 check that we are getting a REFUSED response.
69 name
= 'refusetwoar.recordscount.tests.powerdns.com.'
70 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
71 query
.additional
.append(dns
.rrset
.from_text(name
,
76 expectedResponse
= dns
.message
.make_response(query
)
77 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
79 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
80 self
.assertEquals(receivedResponse
, expectedResponse
)
82 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
83 self
.assertEquals(receivedResponse
, expectedResponse
)
85 class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest
):
87 _config_template
= """
88 addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
89 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
90 newServer{address="127.0.0.1:%s"}
93 def testRecordsCountRefuseOneAN(self
):
95 RecordsCount: Refuse ancount == 0
97 Send a query to "refusenoan.recordscount.tests.powerdns.com.",
98 check that we are getting a REFUSED response.
100 name
= 'refusenoan.recordscount.tests.powerdns.com.'
101 query
= dns
.message
.make_query(name
, 'A', 'IN')
102 expectedResponse
= dns
.message
.make_response(query
)
103 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
105 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
106 self
.assertEquals(receivedResponse
, expectedResponse
)
108 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
109 self
.assertEquals(receivedResponse
, expectedResponse
)
111 def testRecordsCountAllowTwoAN(self
):
113 RecordsCount: Allow ancount == 2
115 Send a query to "allowtwoan.recordscount.tests.powerdns.com.",
116 check that we are getting a valid response.
118 name
= 'allowtwoan.recordscount.tests.powerdns.com.'
119 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
120 rrset
= dns
.rrset
.from_text_list(name
,
124 ['127.0.0.1', '127.0.0.2'])
125 query
.answer
.append(rrset
)
126 response
= dns
.message
.make_response(query
)
127 response
.answer
.append(rrset
)
129 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
130 self
.assertTrue(receivedQuery
)
131 self
.assertTrue(receivedResponse
)
132 receivedQuery
.id = query
.id
133 self
.assertEquals(query
, receivedQuery
)
134 self
.assertEquals(response
, receivedResponse
)
136 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
137 self
.assertTrue(receivedQuery
)
138 self
.assertTrue(receivedResponse
)
139 receivedQuery
.id = query
.id
140 self
.assertEquals(query
, receivedQuery
)
141 self
.assertEquals(response
, receivedResponse
)
143 def testRecordsCountRefuseFourAN(self
):
145 RecordsCount: Refuse ancount > 3
147 Send a query to "refusefouran.recordscount.tests.powerdns.com.",
148 check that we are getting a REFUSED response.
150 name
= 'refusefouran.recordscount.tests.powerdns.com.'
151 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
152 rrset
= dns
.rrset
.from_text_list(name
,
156 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4'])
157 query
.answer
.append(rrset
)
159 expectedResponse
= dns
.message
.make_response(query
)
160 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
161 expectedResponse
.answer
.append(rrset
)
163 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
164 self
.assertEquals(receivedResponse
, expectedResponse
)
166 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
167 self
.assertEquals(receivedResponse
, expectedResponse
)
169 class TestRecordsCountNothingInNS(DNSDistTest
):
171 _config_template
= """
172 addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
173 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
174 newServer{address="127.0.0.1:%s"}
177 def testRecordsCountRefuseNS(self
):
179 RecordsCount: Refuse nscount != 0
181 Send a query to "refusens.recordscount.tests.powerdns.com.",
182 check that we are getting a REFUSED response.
184 name
= 'refusens.recordscount.tests.powerdns.com.'
185 query
= dns
.message
.make_query(name
, 'A', 'IN')
186 rrset
= dns
.rrset
.from_text(name
,
190 'ns.tests.powerdns.com.')
191 query
.authority
.append(rrset
)
192 expectedResponse
= dns
.message
.make_response(query
)
193 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
194 expectedResponse
.authority
.append(rrset
)
196 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
197 self
.assertEquals(receivedResponse
, expectedResponse
)
199 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
200 self
.assertEquals(receivedResponse
, expectedResponse
)
203 def testRecordsCountAllowEmptyNS(self
):
205 RecordsCount: Allow nscount == 0
207 Send a query to "allowns.recordscount.tests.powerdns.com.",
208 check that we are getting a valid response.
210 name
= 'allowns.recordscount.tests.powerdns.com.'
211 query
= dns
.message
.make_query(name
, 'A', 'IN')
212 response
= dns
.message
.make_response(query
)
213 response
.answer
.append(dns
.rrset
.from_text(name
,
219 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
220 self
.assertTrue(receivedQuery
)
221 self
.assertTrue(receivedResponse
)
222 receivedQuery
.id = query
.id
223 self
.assertEquals(query
, receivedQuery
)
224 self
.assertEquals(response
, receivedResponse
)
226 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
227 self
.assertTrue(receivedQuery
)
228 self
.assertTrue(receivedResponse
)
229 receivedQuery
.id = query
.id
230 self
.assertEquals(query
, receivedQuery
)
231 self
.assertEquals(response
, receivedResponse
)
233 class TestRecordsCountNoOPTInAR(DNSDistTest
):
235 _config_template
= """
236 addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, dnsdist.OPT, 0, 0)), RCodeAction(dnsdist.REFUSED))
237 newServer{address="127.0.0.1:%s"}
240 def testRecordsCountRefuseOPTinAR(self
):
242 RecordsTypeCount: Refuse OPT in AR
244 Send a query to "refuseoptinar.recordscount.tests.powerdns.com.",
245 check that we are getting a REFUSED response.
247 name
= 'refuseoptinar.recordscount.tests.powerdns.com.'
248 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True)
249 expectedResponse
= dns
.message
.make_response(query
)
250 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
252 (_
, receivedResponse
) = self
.sendUDPQuery(query
, response
=None, useQueue
=False)
253 self
.assertEquals(receivedResponse
, expectedResponse
)
255 (_
, receivedResponse
) = self
.sendTCPQuery(query
, response
=None, useQueue
=False)
256 self
.assertEquals(receivedResponse
, expectedResponse
)
258 def testRecordsCountAllowNoOPTInAR(self
):
260 RecordsTypeCount: Allow no OPT in AR
262 Send a query to "allownooptinar.recordscount.tests.powerdns.com.",
263 check that we are getting a valid response.
265 name
= 'allowwnooptinar.recordscount.tests.powerdns.com.'
266 query
= dns
.message
.make_query(name
, 'A', 'IN')
267 response
= dns
.message
.make_response(query
)
268 response
.answer
.append(dns
.rrset
.from_text(name
,
274 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
275 self
.assertTrue(receivedQuery
)
276 self
.assertTrue(receivedResponse
)
277 receivedQuery
.id = query
.id
278 self
.assertEquals(query
, receivedQuery
)
279 self
.assertEquals(response
, receivedResponse
)
281 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
282 self
.assertTrue(receivedQuery
)
283 self
.assertTrue(receivedResponse
)
284 receivedQuery
.id = query
.id
285 self
.assertEquals(query
, receivedQuery
)
286 self
.assertEquals(response
, receivedResponse
)