]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_RecordsCount.py
Merge pull request #3926 from pieterlexis/recursor-CD-on-fwd
[thirdparty/pdns.git] / regression-tests.dnsdist / test_RecordsCount.py
1 #!/usr/bin/env python
2 import copy
3 import os
4 import dns
5 from dnsdisttests import DNSDistTest
6
7 class TestRecordsCountOnlyOneAR(DNSDistTest):
8
9 _config_template = """
10 addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(dnsdist.REFUSED))
11 newServer{address="127.0.0.1:%s"}
12 """
13
14 def testRecordsCountRefuseEmptyAR(self):
15 """
16 RecordsCount: Refuse arcount == 0
17
18 Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
19 check that we are getting a REFUSED response.
20 """
21 name = 'refuseemptyar.recordscount.tests.powerdns.com.'
22 query = dns.message.make_query(name, 'A', 'IN')
23 expectedResponse = dns.message.make_response(query)
24 expectedResponse.set_rcode(dns.rcode.REFUSED)
25
26 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
27 self.assertEquals(receivedResponse, expectedResponse)
28
29 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
30 self.assertEquals(receivedResponse, expectedResponse)
31
32 def testRecordsCountAllowOneAR(self):
33 """
34 RecordsCount: Allow arcount == 1
35
36 Send a query to "allowonear.recordscount.tests.powerdns.com.",
37 check that we are getting a valid response.
38 """
39 name = 'allowonear.recordscount.tests.powerdns.com.'
40 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
41 response = dns.message.make_response(query)
42 response.answer.append(dns.rrset.from_text(name,
43 3600,
44 dns.rdataclass.IN,
45 dns.rdatatype.A,
46 '127.0.0.1'))
47
48 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
49 self.assertTrue(receivedQuery)
50 self.assertTrue(receivedResponse)
51 receivedQuery.id = query.id
52 self.assertEquals(query, receivedQuery)
53 self.assertEquals(response, receivedResponse)
54
55 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
56 self.assertTrue(receivedQuery)
57 self.assertTrue(receivedResponse)
58 receivedQuery.id = query.id
59 self.assertEquals(query, receivedQuery)
60 self.assertEquals(response, receivedResponse)
61
62 def testRecordsCountRefuseTwoAR(self):
63 """
64 RecordsCount: Refuse arcount > 1
65
66 Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
67 check that we are getting a REFUSED response.
68 """
69 name = 'refusetwoar.recordscount.tests.powerdns.com.'
70 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
71 query.additional.append(dns.rrset.from_text(name,
72 3600,
73 dns.rdataclass.IN,
74 dns.rdatatype.A,
75 '127.0.0.1'))
76 expectedResponse = dns.message.make_response(query)
77 expectedResponse.set_rcode(dns.rcode.REFUSED)
78
79 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
80 self.assertEquals(receivedResponse, expectedResponse)
81
82 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
83 self.assertEquals(receivedResponse, expectedResponse)
84
85 class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest):
86
87 _config_template = """
88 addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
89 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
90 newServer{address="127.0.0.1:%s"}
91 """
92
93 def testRecordsCountRefuseOneAN(self):
94 """
95 RecordsCount: Refuse ancount == 0
96
97 Send a query to "refusenoan.recordscount.tests.powerdns.com.",
98 check that we are getting a REFUSED response.
99 """
100 name = 'refusenoan.recordscount.tests.powerdns.com.'
101 query = dns.message.make_query(name, 'A', 'IN')
102 expectedResponse = dns.message.make_response(query)
103 expectedResponse.set_rcode(dns.rcode.REFUSED)
104
105 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
106 self.assertEquals(receivedResponse, expectedResponse)
107
108 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
109 self.assertEquals(receivedResponse, expectedResponse)
110
111 def testRecordsCountAllowTwoAN(self):
112 """
113 RecordsCount: Allow ancount == 2
114
115 Send a query to "allowtwoan.recordscount.tests.powerdns.com.",
116 check that we are getting a valid response.
117 """
118 name = 'allowtwoan.recordscount.tests.powerdns.com.'
119 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
120 rrset = dns.rrset.from_text_list(name,
121 3600,
122 dns.rdataclass.IN,
123 dns.rdatatype.A,
124 ['127.0.0.1', '127.0.0.2'])
125 query.answer.append(rrset)
126 response = dns.message.make_response(query)
127 response.answer.append(rrset)
128
129 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
130 self.assertTrue(receivedQuery)
131 self.assertTrue(receivedResponse)
132 receivedQuery.id = query.id
133 self.assertEquals(query, receivedQuery)
134 self.assertEquals(response, receivedResponse)
135
136 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
137 self.assertTrue(receivedQuery)
138 self.assertTrue(receivedResponse)
139 receivedQuery.id = query.id
140 self.assertEquals(query, receivedQuery)
141 self.assertEquals(response, receivedResponse)
142
143 def testRecordsCountRefuseFourAN(self):
144 """
145 RecordsCount: Refuse ancount > 3
146
147 Send a query to "refusefouran.recordscount.tests.powerdns.com.",
148 check that we are getting a REFUSED response.
149 """
150 name = 'refusefouran.recordscount.tests.powerdns.com.'
151 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
152 rrset = dns.rrset.from_text_list(name,
153 3600,
154 dns.rdataclass.IN,
155 dns.rdatatype.A,
156 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4'])
157 query.answer.append(rrset)
158
159 expectedResponse = dns.message.make_response(query)
160 expectedResponse.set_rcode(dns.rcode.REFUSED)
161 expectedResponse.answer.append(rrset)
162
163 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
164 self.assertEquals(receivedResponse, expectedResponse)
165
166 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
167 self.assertEquals(receivedResponse, expectedResponse)
168
169 class TestRecordsCountNothingInNS(DNSDistTest):
170
171 _config_template = """
172 addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
173 addAction(AllRule(), RCodeAction(dnsdist.REFUSED))
174 newServer{address="127.0.0.1:%s"}
175 """
176
177 def testRecordsCountRefuseNS(self):
178 """
179 RecordsCount: Refuse nscount != 0
180
181 Send a query to "refusens.recordscount.tests.powerdns.com.",
182 check that we are getting a REFUSED response.
183 """
184 name = 'refusens.recordscount.tests.powerdns.com.'
185 query = dns.message.make_query(name, 'A', 'IN')
186 rrset = dns.rrset.from_text(name,
187 3600,
188 dns.rdataclass.IN,
189 dns.rdatatype.NS,
190 'ns.tests.powerdns.com.')
191 query.authority.append(rrset)
192 expectedResponse = dns.message.make_response(query)
193 expectedResponse.set_rcode(dns.rcode.REFUSED)
194 expectedResponse.authority.append(rrset)
195
196 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
197 self.assertEquals(receivedResponse, expectedResponse)
198
199 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
200 self.assertEquals(receivedResponse, expectedResponse)
201
202
203 def testRecordsCountAllowEmptyNS(self):
204 """
205 RecordsCount: Allow nscount == 0
206
207 Send a query to "allowns.recordscount.tests.powerdns.com.",
208 check that we are getting a valid response.
209 """
210 name = 'allowns.recordscount.tests.powerdns.com.'
211 query = dns.message.make_query(name, 'A', 'IN')
212 response = dns.message.make_response(query)
213 response.answer.append(dns.rrset.from_text(name,
214 3600,
215 dns.rdataclass.IN,
216 dns.rdatatype.A,
217 '127.0.0.1'))
218
219 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
220 self.assertTrue(receivedQuery)
221 self.assertTrue(receivedResponse)
222 receivedQuery.id = query.id
223 self.assertEquals(query, receivedQuery)
224 self.assertEquals(response, receivedResponse)
225
226 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
227 self.assertTrue(receivedQuery)
228 self.assertTrue(receivedResponse)
229 receivedQuery.id = query.id
230 self.assertEquals(query, receivedQuery)
231 self.assertEquals(response, receivedResponse)
232
233 class TestRecordsCountNoOPTInAR(DNSDistTest):
234
235 _config_template = """
236 addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, dnsdist.OPT, 0, 0)), RCodeAction(dnsdist.REFUSED))
237 newServer{address="127.0.0.1:%s"}
238 """
239
240 def testRecordsCountRefuseOPTinAR(self):
241 """
242 RecordsTypeCount: Refuse OPT in AR
243
244 Send a query to "refuseoptinar.recordscount.tests.powerdns.com.",
245 check that we are getting a REFUSED response.
246 """
247 name = 'refuseoptinar.recordscount.tests.powerdns.com.'
248 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
249 expectedResponse = dns.message.make_response(query)
250 expectedResponse.set_rcode(dns.rcode.REFUSED)
251
252 (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
253 self.assertEquals(receivedResponse, expectedResponse)
254
255 (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
256 self.assertEquals(receivedResponse, expectedResponse)
257
258 def testRecordsCountAllowNoOPTInAR(self):
259 """
260 RecordsTypeCount: Allow no OPT in AR
261
262 Send a query to "allownooptinar.recordscount.tests.powerdns.com.",
263 check that we are getting a valid response.
264 """
265 name = 'allowwnooptinar.recordscount.tests.powerdns.com.'
266 query = dns.message.make_query(name, 'A', 'IN')
267 response = dns.message.make_response(query)
268 response.answer.append(dns.rrset.from_text(name,
269 3600,
270 dns.rdataclass.IN,
271 dns.rdatatype.A,
272 '127.0.0.1'))
273
274 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
275 self.assertTrue(receivedQuery)
276 self.assertTrue(receivedResponse)
277 receivedQuery.id = query.id
278 self.assertEquals(query, receivedQuery)
279 self.assertEquals(response, receivedResponse)
280
281 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
282 self.assertTrue(receivedQuery)
283 self.assertTrue(receivedResponse)
284 receivedQuery.id = query.id
285 self.assertEquals(query, receivedQuery)
286 self.assertEquals(response, receivedResponse)