]>
Commit | Line | Data |
---|---|---|
55baa1f2 RG |
1 | #!/usr/bin/env python |
2 | import copy | |
3 | import os | |
4 | import dns | |
5 | from dnsdisttests import DNSDistTest | |
6 | ||
7 | class TestRecordsCountOnlyOneAR(DNSDistTest): | |
8 | ||
9 | _config_template = """ | |
d3ec24f9 | 10 | addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(DNSRCode.REFUSED)) |
55baa1f2 RG |
11 | newServer{address="127.0.0.1:%s"} |
12 | """ | |
13 | ||
14 | def testRecordsCountRefuseEmptyAR(self): | |
15 | """ | |
65fc9d08 | 16 | RecordsCount: Refuse arcount == 0 (No OPT) |
55baa1f2 RG |
17 | |
18 | Send a query to "refuseemptyar.recordscount.tests.powerdns.com.", | |
19 | check that we are getting a REFUSED response. | |
20 | """ | |
21 | name = 'refuseemptyar.recordscount.tests.powerdns.com.' | |
22 | query = dns.message.make_query(name, 'A', 'IN') | |
7af22479 | 23 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
24 | expectedResponse = dns.message.make_response(query) |
25 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
26 | ||
6ca2e796 RG |
27 | for method in ("sendUDPQuery", "sendTCPQuery"): |
28 | sender = getattr(self, method) | |
29 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
30 | self.assertEquals(receivedResponse, expectedResponse) | |
55baa1f2 RG |
31 | |
32 | def testRecordsCountAllowOneAR(self): | |
33 | """ | |
65fc9d08 | 34 | RecordsCount: Allow arcount == 1 (OPT) |
55baa1f2 RG |
35 | |
36 | Send a query to "allowonear.recordscount.tests.powerdns.com.", | |
37 | check that we are getting a valid response. | |
38 | """ | |
39 | name = 'allowonear.recordscount.tests.powerdns.com.' | |
40 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
41 | response = dns.message.make_response(query) | |
42 | response.answer.append(dns.rrset.from_text(name, | |
43 | 3600, | |
44 | dns.rdataclass.IN, | |
45 | dns.rdatatype.A, | |
46 | '127.0.0.1')) | |
47 | ||
6ca2e796 RG |
48 | for method in ("sendUDPQuery", "sendTCPQuery"): |
49 | sender = getattr(self, method) | |
50 | (receivedQuery, receivedResponse) = sender(query, response) | |
51 | self.assertTrue(receivedQuery) | |
52 | self.assertTrue(receivedResponse) | |
53 | receivedQuery.id = query.id | |
54 | self.assertEquals(query, receivedQuery) | |
55 | self.assertEquals(response, receivedResponse) | |
55baa1f2 RG |
56 | |
57 | def testRecordsCountRefuseTwoAR(self): | |
58 | """ | |
65fc9d08 | 59 | RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record) |
55baa1f2 RG |
60 | |
61 | Send a query to "refusetwoar.recordscount.tests.powerdns.com.", | |
62 | check that we are getting a REFUSED response. | |
63 | """ | |
64 | name = 'refusetwoar.recordscount.tests.powerdns.com.' | |
65 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
7af22479 | 66 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
67 | query.additional.append(dns.rrset.from_text(name, |
68 | 3600, | |
69 | dns.rdataclass.IN, | |
70 | dns.rdatatype.A, | |
71 | '127.0.0.1')) | |
72 | expectedResponse = dns.message.make_response(query) | |
73 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
74 | ||
6ca2e796 RG |
75 | for method in ("sendUDPQuery", "sendTCPQuery"): |
76 | sender = getattr(self, method) | |
77 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
78 | self.assertEquals(receivedResponse, expectedResponse) | |
55baa1f2 RG |
79 | |
80 | class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest): | |
81 | ||
82 | _config_template = """ | |
83 | addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction()) | |
d3ec24f9 | 84 | addAction(AllRule(), RCodeAction(DNSRCode.REFUSED)) |
55baa1f2 RG |
85 | newServer{address="127.0.0.1:%s"} |
86 | """ | |
87 | ||
88 | def testRecordsCountRefuseOneAN(self): | |
89 | """ | |
90 | RecordsCount: Refuse ancount == 0 | |
91 | ||
92 | Send a query to "refusenoan.recordscount.tests.powerdns.com.", | |
93 | check that we are getting a REFUSED response. | |
94 | """ | |
95 | name = 'refusenoan.recordscount.tests.powerdns.com.' | |
96 | query = dns.message.make_query(name, 'A', 'IN') | |
7af22479 | 97 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
98 | expectedResponse = dns.message.make_response(query) |
99 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
100 | ||
6ca2e796 RG |
101 | for method in ("sendUDPQuery", "sendTCPQuery"): |
102 | sender = getattr(self, method) | |
103 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
104 | self.assertEquals(receivedResponse, expectedResponse) | |
55baa1f2 RG |
105 | |
106 | def testRecordsCountAllowTwoAN(self): | |
107 | """ | |
108 | RecordsCount: Allow ancount == 2 | |
109 | ||
110 | Send a query to "allowtwoan.recordscount.tests.powerdns.com.", | |
111 | check that we are getting a valid response. | |
112 | """ | |
113 | name = 'allowtwoan.recordscount.tests.powerdns.com.' | |
114 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
115 | rrset = dns.rrset.from_text_list(name, | |
116 | 3600, | |
117 | dns.rdataclass.IN, | |
118 | dns.rdatatype.A, | |
119 | ['127.0.0.1', '127.0.0.2']) | |
120 | query.answer.append(rrset) | |
121 | response = dns.message.make_response(query) | |
122 | response.answer.append(rrset) | |
123 | ||
6ca2e796 RG |
124 | for method in ("sendUDPQuery", "sendTCPQuery"): |
125 | sender = getattr(self, method) | |
126 | (receivedQuery, receivedResponse) = sender(query, response) | |
127 | self.assertTrue(receivedQuery) | |
128 | self.assertTrue(receivedResponse) | |
129 | receivedQuery.id = query.id | |
130 | self.assertEquals(query, receivedQuery) | |
131 | self.assertEquals(response, receivedResponse) | |
55baa1f2 RG |
132 | |
133 | def testRecordsCountRefuseFourAN(self): | |
134 | """ | |
135 | RecordsCount: Refuse ancount > 3 | |
136 | ||
137 | Send a query to "refusefouran.recordscount.tests.powerdns.com.", | |
138 | check that we are getting a REFUSED response. | |
139 | """ | |
140 | name = 'refusefouran.recordscount.tests.powerdns.com.' | |
141 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
7af22479 | 142 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
143 | rrset = dns.rrset.from_text_list(name, |
144 | 3600, | |
145 | dns.rdataclass.IN, | |
146 | dns.rdatatype.A, | |
147 | ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4']) | |
148 | query.answer.append(rrset) | |
149 | ||
150 | expectedResponse = dns.message.make_response(query) | |
151 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
152 | expectedResponse.answer.append(rrset) | |
153 | ||
6ca2e796 RG |
154 | for method in ("sendUDPQuery", "sendTCPQuery"): |
155 | sender = getattr(self, method) | |
156 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
157 | self.assertEquals(receivedResponse, expectedResponse) | |
55baa1f2 RG |
158 | |
159 | class TestRecordsCountNothingInNS(DNSDistTest): | |
160 | ||
161 | _config_template = """ | |
162 | addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction()) | |
d3ec24f9 | 163 | addAction(AllRule(), RCodeAction(DNSRCode.REFUSED)) |
55baa1f2 RG |
164 | newServer{address="127.0.0.1:%s"} |
165 | """ | |
166 | ||
167 | def testRecordsCountRefuseNS(self): | |
168 | """ | |
169 | RecordsCount: Refuse nscount != 0 | |
170 | ||
171 | Send a query to "refusens.recordscount.tests.powerdns.com.", | |
172 | check that we are getting a REFUSED response. | |
173 | """ | |
174 | name = 'refusens.recordscount.tests.powerdns.com.' | |
175 | query = dns.message.make_query(name, 'A', 'IN') | |
176 | rrset = dns.rrset.from_text(name, | |
177 | 3600, | |
178 | dns.rdataclass.IN, | |
179 | dns.rdatatype.NS, | |
180 | 'ns.tests.powerdns.com.') | |
181 | query.authority.append(rrset) | |
7af22479 | 182 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
183 | expectedResponse = dns.message.make_response(query) |
184 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
185 | expectedResponse.authority.append(rrset) | |
186 | ||
6ca2e796 RG |
187 | for method in ("sendUDPQuery", "sendTCPQuery"): |
188 | sender = getattr(self, method) | |
189 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
190 | self.assertEquals(receivedResponse, expectedResponse) | |
55baa1f2 RG |
191 | |
192 | ||
193 | def testRecordsCountAllowEmptyNS(self): | |
194 | """ | |
195 | RecordsCount: Allow nscount == 0 | |
196 | ||
197 | Send a query to "allowns.recordscount.tests.powerdns.com.", | |
198 | check that we are getting a valid response. | |
199 | """ | |
200 | name = 'allowns.recordscount.tests.powerdns.com.' | |
201 | query = dns.message.make_query(name, 'A', 'IN') | |
202 | response = dns.message.make_response(query) | |
203 | response.answer.append(dns.rrset.from_text(name, | |
204 | 3600, | |
205 | dns.rdataclass.IN, | |
206 | dns.rdatatype.A, | |
207 | '127.0.0.1')) | |
208 | ||
6ca2e796 RG |
209 | for method in ("sendUDPQuery", "sendTCPQuery"): |
210 | sender = getattr(self, method) | |
211 | (receivedQuery, receivedResponse) = sender(query, response) | |
212 | self.assertTrue(receivedQuery) | |
213 | self.assertTrue(receivedResponse) | |
214 | receivedQuery.id = query.id | |
215 | self.assertEquals(query, receivedQuery) | |
216 | self.assertEquals(response, receivedResponse) | |
55baa1f2 RG |
217 | |
218 | class TestRecordsCountNoOPTInAR(DNSDistTest): | |
219 | ||
220 | _config_template = """ | |
d3ec24f9 | 221 | addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, DNSQType.OPT, 0, 0)), RCodeAction(DNSRCode.REFUSED)) |
55baa1f2 RG |
222 | newServer{address="127.0.0.1:%s"} |
223 | """ | |
224 | ||
225 | def testRecordsCountRefuseOPTinAR(self): | |
226 | """ | |
227 | RecordsTypeCount: Refuse OPT in AR | |
228 | ||
229 | Send a query to "refuseoptinar.recordscount.tests.powerdns.com.", | |
230 | check that we are getting a REFUSED response. | |
231 | """ | |
232 | name = 'refuseoptinar.recordscount.tests.powerdns.com.' | |
233 | query = dns.message.make_query(name, 'A', 'IN', use_edns=True) | |
7af22479 | 234 | query.flags &= ~dns.flags.RD |
55baa1f2 RG |
235 | expectedResponse = dns.message.make_response(query) |
236 | expectedResponse.set_rcode(dns.rcode.REFUSED) | |
237 | ||
6ca2e796 RG |
238 | for method in ("sendUDPQuery", "sendTCPQuery"): |
239 | sender = getattr(self, method) | |
240 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
241 | self.assertEquals(receivedResponse, expectedResponse) | |
55baa1f2 RG |
242 | |
243 | def testRecordsCountAllowNoOPTInAR(self): | |
244 | """ | |
245 | RecordsTypeCount: Allow no OPT in AR | |
246 | ||
247 | Send a query to "allownooptinar.recordscount.tests.powerdns.com.", | |
248 | check that we are getting a valid response. | |
249 | """ | |
250 | name = 'allowwnooptinar.recordscount.tests.powerdns.com.' | |
251 | query = dns.message.make_query(name, 'A', 'IN') | |
252 | response = dns.message.make_response(query) | |
253 | response.answer.append(dns.rrset.from_text(name, | |
254 | 3600, | |
255 | dns.rdataclass.IN, | |
256 | dns.rdatatype.A, | |
257 | '127.0.0.1')) | |
258 | ||
6ca2e796 RG |
259 | for method in ("sendUDPQuery", "sendTCPQuery"): |
260 | sender = getattr(self, method) | |
261 | (receivedQuery, receivedResponse) = sender(query, response) | |
262 | self.assertTrue(receivedQuery) | |
263 | self.assertTrue(receivedResponse) | |
264 | receivedQuery.id = query.id | |
265 | self.assertEquals(query, receivedQuery) | |
266 | self.assertEquals(response, receivedResponse) | |
65fc9d08 RG |
267 | |
268 | def testRecordsCountAllowTwoARButNoOPT(self): | |
269 | """ | |
270 | RecordsTypeCount: Allow arcount > 1 without OPT | |
271 | ||
272 | Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.", | |
273 | check that we are getting a valid response. | |
274 | """ | |
275 | name = 'allowtwoarnoopt.recordscount.tests.powerdns.com.' | |
276 | query = dns.message.make_query(name, 'A', 'IN') | |
277 | query.additional.append(dns.rrset.from_text(name, | |
278 | 3600, | |
279 | dns.rdataclass.IN, | |
280 | dns.rdatatype.A, | |
281 | '127.0.0.1')) | |
282 | query.additional.append(dns.rrset.from_text(name, | |
283 | 3600, | |
284 | dns.rdataclass.IN, | |
285 | dns.rdatatype.A, | |
286 | '127.0.0.1')) | |
287 | ||
288 | response = dns.message.make_response(query) | |
289 | response.answer.append(dns.rrset.from_text(name, | |
290 | 3600, | |
291 | dns.rdataclass.IN, | |
292 | dns.rdatatype.A, | |
293 | '127.0.0.1')) | |
294 | ||
6ca2e796 RG |
295 | for method in ("sendUDPQuery", "sendTCPQuery"): |
296 | sender = getattr(self, method) | |
297 | (receivedQuery, receivedResponse) = sender(query, response) | |
298 | self.assertTrue(receivedQuery) | |
299 | self.assertTrue(receivedResponse) | |
300 | receivedQuery.id = query.id | |
301 | self.assertEquals(query, receivedQuery) | |
302 | self.assertEquals(response, receivedResponse) |