]>
Commit | Line | Data |
---|---|---|
55baa1f2 RG |
1 | #!/usr/bin/env python |
2 | import copy | |
3 | import os | |
4 | import dns | |
5 | from dnsdisttests import DNSDistTest | |
6 | ||
7 | class TestRecordsCountOnlyOneAR(DNSDistTest): | |
8 | ||
9 | _config_template = """ | |
d3ec24f9 | 10 | addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(DNSRCode.REFUSED)) |
55baa1f2 RG |
11 | newServer{address="127.0.0.1:%s"} |
12 | """ | |
13 | ||
14 | def testRecordsCountRefuseEmptyAR(self): | |
15 | """ | |
65fc9d08 | 16 | RecordsCount: Refuse arcount == 0 (No OPT) |
55baa1f2 RG |
17 | |
18 | Send a query to "refuseemptyar.recordscount.tests.powerdns.com.", | |
19 | check that we are getting a REFUSED response. | |
20 | """ | |
21 | name = 'refuseemptyar.recordscount.tests.powerdns.com.' | |
22 | query = dns.message.make_query(name, 'A', 'IN') | |
7af22479 | 23 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
24 | expectedResponse = dns.message.make_response(query) |
25 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
26 | ||
6ca2e796 RG |
27 | for method in ("sendUDPQuery", "sendTCPQuery"): |
28 | sender = getattr(self, method) | |
29 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
4bfebc93 | 30 | self.assertEqual(receivedResponse, expectedResponse) |
55baa1f2 RG |
31 | |
32 | def testRecordsCountAllowOneAR(self): | |
33 | """ | |
65fc9d08 | 34 | RecordsCount: Allow arcount == 1 (OPT) |
55baa1f2 RG |
35 | |
36 | Send a query to "allowonear.recordscount.tests.powerdns.com.", | |
37 | check that we are getting a valid response. | |
38 | """ | |
39 | name = 'allowonear.recordscount.tests.powerdns.com.' | |
40 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
41 | response = dns.message.make_response(query) | |
42 | response.answer.append(dns.rrset.from_text(name, | |
43 | 3600, | |
44 | dns.rdataclass.IN, | |
45 | dns.rdatatype.A, | |
46 | '127.0.0.1')) | |
47 | ||
6ca2e796 RG |
48 | for method in ("sendUDPQuery", "sendTCPQuery"): |
49 | sender = getattr(self, method) | |
50 | (receivedQuery, receivedResponse) = sender(query, response) | |
51 | self.assertTrue(receivedQuery) | |
52 | self.assertTrue(receivedResponse) | |
53 | receivedQuery.id = query.id | |
4bfebc93 CH |
54 | self.assertEqual(query, receivedQuery) |
55 | self.assertEqual(response, receivedResponse) | |
55baa1f2 RG |
56 | |
57 | def testRecordsCountRefuseTwoAR(self): | |
58 | """ | |
65fc9d08 | 59 | RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record) |
55baa1f2 RG |
60 | |
61 | Send a query to "refusetwoar.recordscount.tests.powerdns.com.", | |
62 | check that we are getting a REFUSED response. | |
63 | """ | |
64 | name = 'refusetwoar.recordscount.tests.powerdns.com.' | |
65 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
7af22479 | 66 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
67 | query.additional.append(dns.rrset.from_text(name, |
68 | 3600, | |
69 | dns.rdataclass.IN, | |
70 | dns.rdatatype.A, | |
71 | '127.0.0.1')) | |
72 | expectedResponse = dns.message.make_response(query) | |
73 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
6e1f856f RG |
74 | # this is not great, we should fix that! |
75 | expectedResponse.additional.append(dns.rrset.from_text(name, | |
76 | 3600, | |
77 | dns.rdataclass.IN, | |
78 | dns.rdatatype.A, | |
79 | '127.0.0.1')) | |
55baa1f2 | 80 | |
6ca2e796 RG |
81 | for method in ("sendUDPQuery", "sendTCPQuery"): |
82 | sender = getattr(self, method) | |
83 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
4bfebc93 | 84 | self.assertEqual(receivedResponse, expectedResponse) |
55baa1f2 RG |
85 | |
86 | class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest): | |
87 | ||
88 | _config_template = """ | |
89 | addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction()) | |
d3ec24f9 | 90 | addAction(AllRule(), RCodeAction(DNSRCode.REFUSED)) |
55baa1f2 RG |
91 | newServer{address="127.0.0.1:%s"} |
92 | """ | |
93 | ||
94 | def testRecordsCountRefuseOneAN(self): | |
95 | """ | |
96 | RecordsCount: Refuse ancount == 0 | |
97 | ||
98 | Send a query to "refusenoan.recordscount.tests.powerdns.com.", | |
99 | check that we are getting a REFUSED response. | |
100 | """ | |
101 | name = 'refusenoan.recordscount.tests.powerdns.com.' | |
102 | query = dns.message.make_query(name, 'A', 'IN') | |
7af22479 | 103 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
104 | expectedResponse = dns.message.make_response(query) |
105 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
106 | ||
6ca2e796 RG |
107 | for method in ("sendUDPQuery", "sendTCPQuery"): |
108 | sender = getattr(self, method) | |
109 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
4bfebc93 | 110 | self.assertEqual(receivedResponse, expectedResponse) |
55baa1f2 RG |
111 | |
112 | def testRecordsCountAllowTwoAN(self): | |
113 | """ | |
114 | RecordsCount: Allow ancount == 2 | |
115 | ||
116 | Send a query to "allowtwoan.recordscount.tests.powerdns.com.", | |
117 | check that we are getting a valid response. | |
118 | """ | |
119 | name = 'allowtwoan.recordscount.tests.powerdns.com.' | |
120 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
121 | rrset = dns.rrset.from_text_list(name, | |
122 | 3600, | |
123 | dns.rdataclass.IN, | |
124 | dns.rdatatype.A, | |
125 | ['127.0.0.1', '127.0.0.2']) | |
126 | query.answer.append(rrset) | |
127 | response = dns.message.make_response(query) | |
128 | response.answer.append(rrset) | |
129 | ||
6ca2e796 RG |
130 | for method in ("sendUDPQuery", "sendTCPQuery"): |
131 | sender = getattr(self, method) | |
132 | (receivedQuery, receivedResponse) = sender(query, response) | |
133 | self.assertTrue(receivedQuery) | |
134 | self.assertTrue(receivedResponse) | |
135 | receivedQuery.id = query.id | |
4bfebc93 CH |
136 | self.assertEqual(query, receivedQuery) |
137 | self.assertEqual(response, receivedResponse) | |
55baa1f2 RG |
138 | |
139 | def testRecordsCountRefuseFourAN(self): | |
140 | """ | |
141 | RecordsCount: Refuse ancount > 3 | |
142 | ||
143 | Send a query to "refusefouran.recordscount.tests.powerdns.com.", | |
144 | check that we are getting a REFUSED response. | |
145 | """ | |
146 | name = 'refusefouran.recordscount.tests.powerdns.com.' | |
147 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
7af22479 | 148 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
149 | rrset = dns.rrset.from_text_list(name, |
150 | 3600, | |
151 | dns.rdataclass.IN, | |
152 | dns.rdatatype.A, | |
153 | ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4']) | |
154 | query.answer.append(rrset) | |
155 | ||
156 | expectedResponse = dns.message.make_response(query) | |
157 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
158 | expectedResponse.answer.append(rrset) | |
159 | ||
6ca2e796 RG |
160 | for method in ("sendUDPQuery", "sendTCPQuery"): |
161 | sender = getattr(self, method) | |
162 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
4bfebc93 | 163 | self.assertEqual(receivedResponse, expectedResponse) |
55baa1f2 RG |
164 | |
165 | class TestRecordsCountNothingInNS(DNSDistTest): | |
166 | ||
167 | _config_template = """ | |
168 | addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction()) | |
d3ec24f9 | 169 | addAction(AllRule(), RCodeAction(DNSRCode.REFUSED)) |
55baa1f2 RG |
170 | newServer{address="127.0.0.1:%s"} |
171 | """ | |
172 | ||
173 | def testRecordsCountRefuseNS(self): | |
174 | """ | |
175 | RecordsCount: Refuse nscount != 0 | |
176 | ||
177 | Send a query to "refusens.recordscount.tests.powerdns.com.", | |
178 | check that we are getting a REFUSED response. | |
179 | """ | |
180 | name = 'refusens.recordscount.tests.powerdns.com.' | |
181 | query = dns.message.make_query(name, 'A', 'IN') | |
182 | rrset = dns.rrset.from_text(name, | |
183 | 3600, | |
184 | dns.rdataclass.IN, | |
185 | dns.rdatatype.NS, | |
186 | 'ns.tests.powerdns.com.') | |
187 | query.authority.append(rrset) | |
7af22479 | 188 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
189 | expectedResponse = dns.message.make_response(query) |
190 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
191 | expectedResponse.authority.append(rrset) | |
192 | ||
6ca2e796 RG |
193 | for method in ("sendUDPQuery", "sendTCPQuery"): |
194 | sender = getattr(self, method) | |
195 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
4bfebc93 | 196 | self.assertEqual(receivedResponse, expectedResponse) |
55baa1f2 RG |
197 | |
198 | ||
199 | def testRecordsCountAllowEmptyNS(self): | |
200 | """ | |
201 | RecordsCount: Allow nscount == 0 | |
202 | ||
203 | Send a query to "allowns.recordscount.tests.powerdns.com.", | |
204 | check that we are getting a valid response. | |
205 | """ | |
206 | name = 'allowns.recordscount.tests.powerdns.com.' | |
207 | query = dns.message.make_query(name, 'A', 'IN') | |
208 | response = dns.message.make_response(query) | |
209 | response.answer.append(dns.rrset.from_text(name, | |
210 | 3600, | |
211 | dns.rdataclass.IN, | |
212 | dns.rdatatype.A, | |
213 | '127.0.0.1')) | |
214 | ||
6ca2e796 RG |
215 | for method in ("sendUDPQuery", "sendTCPQuery"): |
216 | sender = getattr(self, method) | |
217 | (receivedQuery, receivedResponse) = sender(query, response) | |
218 | self.assertTrue(receivedQuery) | |
219 | self.assertTrue(receivedResponse) | |
220 | receivedQuery.id = query.id | |
4bfebc93 CH |
221 | self.assertEqual(query, receivedQuery) |
222 | self.assertEqual(response, receivedResponse) | |
55baa1f2 RG |
223 | |
224 | class TestRecordsCountNoOPTInAR(DNSDistTest): | |
225 | ||
226 | _config_template = """ | |
d3ec24f9 | 227 | addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, DNSQType.OPT, 0, 0)), RCodeAction(DNSRCode.REFUSED)) |
55baa1f2 RG |
228 | newServer{address="127.0.0.1:%s"} |
229 | """ | |
230 | ||
231 | def testRecordsCountRefuseOPTinAR(self): | |
232 | """ | |
233 | RecordsTypeCount: Refuse OPT in AR | |
234 | ||
235 | Send a query to "refuseoptinar.recordscount.tests.powerdns.com.", | |
236 | check that we are getting a REFUSED response. | |
237 | """ | |
238 | name = 'refuseoptinar.recordscount.tests.powerdns.com.' | |
239 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
7af22479 | 240 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
241 | expectedResponse = dns.message.make_response(query) |
242 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
243 | ||
6ca2e796 RG |
244 | for method in ("sendUDPQuery", "sendTCPQuery"): |
245 | sender = getattr(self, method) | |
246 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
4bfebc93 | 247 | self.assertEqual(receivedResponse, expectedResponse) |
55baa1f2 RG |
248 | |
249 | def testRecordsCountAllowNoOPTInAR(self): | |
250 | """ | |
251 | RecordsTypeCount: Allow no OPT in AR | |
252 | ||
253 | Send a query to "allownooptinar.recordscount.tests.powerdns.com.", | |
254 | check that we are getting a valid response. | |
255 | """ | |
256 | name = 'allowwnooptinar.recordscount.tests.powerdns.com.' | |
257 | query = dns.message.make_query(name, 'A', 'IN') | |
258 | response = dns.message.make_response(query) | |
259 | response.answer.append(dns.rrset.from_text(name, | |
260 | 3600, | |
261 | dns.rdataclass.IN, | |
262 | dns.rdatatype.A, | |
263 | '127.0.0.1')) | |
264 | ||
6ca2e796 RG |
265 | for method in ("sendUDPQuery", "sendTCPQuery"): |
266 | sender = getattr(self, method) | |
267 | (receivedQuery, receivedResponse) = sender(query, response) | |
268 | self.assertTrue(receivedQuery) | |
269 | self.assertTrue(receivedResponse) | |
270 | receivedQuery.id = query.id | |
4bfebc93 CH |
271 | self.assertEqual(query, receivedQuery) |
272 | self.assertEqual(response, receivedResponse) | |
65fc9d08 RG |
273 | |
274 | def testRecordsCountAllowTwoARButNoOPT(self): | |
275 | """ | |
276 | RecordsTypeCount: Allow arcount > 1 without OPT | |
277 | ||
278 | Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.", | |
279 | check that we are getting a valid response. | |
280 | """ | |
281 | name = 'allowtwoarnoopt.recordscount.tests.powerdns.com.' | |
282 | query = dns.message.make_query(name, 'A', 'IN') | |
283 | query.additional.append(dns.rrset.from_text(name, | |
284 | 3600, | |
285 | dns.rdataclass.IN, | |
286 | dns.rdatatype.A, | |
287 | '127.0.0.1')) | |
288 | query.additional.append(dns.rrset.from_text(name, | |
289 | 3600, | |
290 | dns.rdataclass.IN, | |
291 | dns.rdatatype.A, | |
292 | '127.0.0.1')) | |
293 | ||
294 | response = dns.message.make_response(query) | |
295 | response.answer.append(dns.rrset.from_text(name, | |
296 | 3600, | |
297 | dns.rdataclass.IN, | |
298 | dns.rdatatype.A, | |
299 | '127.0.0.1')) | |
300 | ||
6ca2e796 RG |
301 | for method in ("sendUDPQuery", "sendTCPQuery"): |
302 | sender = getattr(self, method) | |
303 | (receivedQuery, receivedResponse) = sender(query, response) | |
304 | self.assertTrue(receivedQuery) | |
305 | self.assertTrue(receivedResponse) | |
306 | receivedQuery.id = query.id | |
4bfebc93 CH |
307 | self.assertEqual(query, receivedQuery) |
308 | self.assertEqual(response, receivedResponse) |