]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_RecordsCount.py
dnsdist: Test setting the value of AA, AD and RA when spoofing
[thirdparty/pdns.git] / regression-tests.dnsdist / test_RecordsCount.py
1 #!/usr/bin/env python
2 import copy
3 import os
4 import dns
5 from dnsdisttests import DNSDistTest
6
7 class TestRecordsCountOnlyOneAR(DNSDistTest):
8
9 _config_template = """
10 addAction(NotRule(RecordsCountRule(DNSSection.Additional, 1, 1)), RCodeAction(DNSRCode.REFUSED))
11 newServer{address="127.0.0.1:%s"}
12 """
13
14 def testRecordsCountRefuseEmptyAR(self):
15 """
16 RecordsCount: Refuse arcount == 0 (No OPT)
17
18 Send a query to "refuseemptyar.recordscount.tests.powerdns.com.",
19 check that we are getting a REFUSED response.
20 """
21 name = 'refuseemptyar.recordscount.tests.powerdns.com.'
22 query = dns.message.make_query(name, 'A', 'IN')
23 expectedResponse = dns.message.make_response(query)
24 expectedResponse.set_rcode(dns.rcode.REFUSED)
25
26 for method in ("sendUDPQuery", "sendTCPQuery"):
27 sender = getattr(self, method)
28 (_, receivedResponse) = sender(query, response=None, useQueue=False)
29 self.assertEquals(receivedResponse, expectedResponse)
30
31 def testRecordsCountAllowOneAR(self):
32 """
33 RecordsCount: Allow arcount == 1 (OPT)
34
35 Send a query to "allowonear.recordscount.tests.powerdns.com.",
36 check that we are getting a valid response.
37 """
38 name = 'allowonear.recordscount.tests.powerdns.com.'
39 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
40 response = dns.message.make_response(query)
41 response.answer.append(dns.rrset.from_text(name,
42 3600,
43 dns.rdataclass.IN,
44 dns.rdatatype.A,
45 '127.0.0.1'))
46
47 for method in ("sendUDPQuery", "sendTCPQuery"):
48 sender = getattr(self, method)
49 (receivedQuery, receivedResponse) = sender(query, response)
50 self.assertTrue(receivedQuery)
51 self.assertTrue(receivedResponse)
52 receivedQuery.id = query.id
53 self.assertEquals(query, receivedQuery)
54 self.assertEquals(response, receivedResponse)
55
56 def testRecordsCountRefuseTwoAR(self):
57 """
58 RecordsCount: Refuse arcount > 1 (OPT + a bogus additional record)
59
60 Send a query to "refusetwoar.recordscount.tests.powerdns.com.",
61 check that we are getting a REFUSED response.
62 """
63 name = 'refusetwoar.recordscount.tests.powerdns.com.'
64 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
65 query.additional.append(dns.rrset.from_text(name,
66 3600,
67 dns.rdataclass.IN,
68 dns.rdatatype.A,
69 '127.0.0.1'))
70 expectedResponse = dns.message.make_response(query)
71 expectedResponse.set_rcode(dns.rcode.REFUSED)
72
73 for method in ("sendUDPQuery", "sendTCPQuery"):
74 sender = getattr(self, method)
75 (_, receivedResponse) = sender(query, response=None, useQueue=False)
76 self.assertEquals(receivedResponse, expectedResponse)
77
78 class TestRecordsCountMoreThanOneLessThanFour(DNSDistTest):
79
80 _config_template = """
81 addAction(RecordsCountRule(DNSSection.Answer, 2, 3), AllowAction())
82 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
83 newServer{address="127.0.0.1:%s"}
84 """
85
86 def testRecordsCountRefuseOneAN(self):
87 """
88 RecordsCount: Refuse ancount == 0
89
90 Send a query to "refusenoan.recordscount.tests.powerdns.com.",
91 check that we are getting a REFUSED response.
92 """
93 name = 'refusenoan.recordscount.tests.powerdns.com.'
94 query = dns.message.make_query(name, 'A', 'IN')
95 expectedResponse = dns.message.make_response(query)
96 expectedResponse.set_rcode(dns.rcode.REFUSED)
97
98 for method in ("sendUDPQuery", "sendTCPQuery"):
99 sender = getattr(self, method)
100 (_, receivedResponse) = sender(query, response=None, useQueue=False)
101 self.assertEquals(receivedResponse, expectedResponse)
102
103 def testRecordsCountAllowTwoAN(self):
104 """
105 RecordsCount: Allow ancount == 2
106
107 Send a query to "allowtwoan.recordscount.tests.powerdns.com.",
108 check that we are getting a valid response.
109 """
110 name = 'allowtwoan.recordscount.tests.powerdns.com.'
111 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
112 rrset = dns.rrset.from_text_list(name,
113 3600,
114 dns.rdataclass.IN,
115 dns.rdatatype.A,
116 ['127.0.0.1', '127.0.0.2'])
117 query.answer.append(rrset)
118 response = dns.message.make_response(query)
119 response.answer.append(rrset)
120
121 for method in ("sendUDPQuery", "sendTCPQuery"):
122 sender = getattr(self, method)
123 (receivedQuery, receivedResponse) = sender(query, response)
124 self.assertTrue(receivedQuery)
125 self.assertTrue(receivedResponse)
126 receivedQuery.id = query.id
127 self.assertEquals(query, receivedQuery)
128 self.assertEquals(response, receivedResponse)
129
130 def testRecordsCountRefuseFourAN(self):
131 """
132 RecordsCount: Refuse ancount > 3
133
134 Send a query to "refusefouran.recordscount.tests.powerdns.com.",
135 check that we are getting a REFUSED response.
136 """
137 name = 'refusefouran.recordscount.tests.powerdns.com.'
138 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
139 rrset = dns.rrset.from_text_list(name,
140 3600,
141 dns.rdataclass.IN,
142 dns.rdatatype.A,
143 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4'])
144 query.answer.append(rrset)
145
146 expectedResponse = dns.message.make_response(query)
147 expectedResponse.set_rcode(dns.rcode.REFUSED)
148 expectedResponse.answer.append(rrset)
149
150 for method in ("sendUDPQuery", "sendTCPQuery"):
151 sender = getattr(self, method)
152 (_, receivedResponse) = sender(query, response=None, useQueue=False)
153 self.assertEquals(receivedResponse, expectedResponse)
154
155 class TestRecordsCountNothingInNS(DNSDistTest):
156
157 _config_template = """
158 addAction(RecordsCountRule(DNSSection.Authority, 0, 0), AllowAction())
159 addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
160 newServer{address="127.0.0.1:%s"}
161 """
162
163 def testRecordsCountRefuseNS(self):
164 """
165 RecordsCount: Refuse nscount != 0
166
167 Send a query to "refusens.recordscount.tests.powerdns.com.",
168 check that we are getting a REFUSED response.
169 """
170 name = 'refusens.recordscount.tests.powerdns.com.'
171 query = dns.message.make_query(name, 'A', 'IN')
172 rrset = dns.rrset.from_text(name,
173 3600,
174 dns.rdataclass.IN,
175 dns.rdatatype.NS,
176 'ns.tests.powerdns.com.')
177 query.authority.append(rrset)
178 expectedResponse = dns.message.make_response(query)
179 expectedResponse.set_rcode(dns.rcode.REFUSED)
180 expectedResponse.authority.append(rrset)
181
182 for method in ("sendUDPQuery", "sendTCPQuery"):
183 sender = getattr(self, method)
184 (_, receivedResponse) = sender(query, response=None, useQueue=False)
185 self.assertEquals(receivedResponse, expectedResponse)
186
187
188 def testRecordsCountAllowEmptyNS(self):
189 """
190 RecordsCount: Allow nscount == 0
191
192 Send a query to "allowns.recordscount.tests.powerdns.com.",
193 check that we are getting a valid response.
194 """
195 name = 'allowns.recordscount.tests.powerdns.com.'
196 query = dns.message.make_query(name, 'A', 'IN')
197 response = dns.message.make_response(query)
198 response.answer.append(dns.rrset.from_text(name,
199 3600,
200 dns.rdataclass.IN,
201 dns.rdatatype.A,
202 '127.0.0.1'))
203
204 for method in ("sendUDPQuery", "sendTCPQuery"):
205 sender = getattr(self, method)
206 (receivedQuery, receivedResponse) = sender(query, response)
207 self.assertTrue(receivedQuery)
208 self.assertTrue(receivedResponse)
209 receivedQuery.id = query.id
210 self.assertEquals(query, receivedQuery)
211 self.assertEquals(response, receivedResponse)
212
213 class TestRecordsCountNoOPTInAR(DNSDistTest):
214
215 _config_template = """
216 addAction(NotRule(RecordsTypeCountRule(DNSSection.Additional, DNSQType.OPT, 0, 0)), RCodeAction(DNSRCode.REFUSED))
217 newServer{address="127.0.0.1:%s"}
218 """
219
220 def testRecordsCountRefuseOPTinAR(self):
221 """
222 RecordsTypeCount: Refuse OPT in AR
223
224 Send a query to "refuseoptinar.recordscount.tests.powerdns.com.",
225 check that we are getting a REFUSED response.
226 """
227 name = 'refuseoptinar.recordscount.tests.powerdns.com.'
228 query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
229 expectedResponse = dns.message.make_response(query)
230 expectedResponse.set_rcode(dns.rcode.REFUSED)
231
232 for method in ("sendUDPQuery", "sendTCPQuery"):
233 sender = getattr(self, method)
234 (_, receivedResponse) = sender(query, response=None, useQueue=False)
235 self.assertEquals(receivedResponse, expectedResponse)
236
237 def testRecordsCountAllowNoOPTInAR(self):
238 """
239 RecordsTypeCount: Allow no OPT in AR
240
241 Send a query to "allownooptinar.recordscount.tests.powerdns.com.",
242 check that we are getting a valid response.
243 """
244 name = 'allowwnooptinar.recordscount.tests.powerdns.com.'
245 query = dns.message.make_query(name, 'A', 'IN')
246 response = dns.message.make_response(query)
247 response.answer.append(dns.rrset.from_text(name,
248 3600,
249 dns.rdataclass.IN,
250 dns.rdatatype.A,
251 '127.0.0.1'))
252
253 for method in ("sendUDPQuery", "sendTCPQuery"):
254 sender = getattr(self, method)
255 (receivedQuery, receivedResponse) = sender(query, response)
256 self.assertTrue(receivedQuery)
257 self.assertTrue(receivedResponse)
258 receivedQuery.id = query.id
259 self.assertEquals(query, receivedQuery)
260 self.assertEquals(response, receivedResponse)
261
262 def testRecordsCountAllowTwoARButNoOPT(self):
263 """
264 RecordsTypeCount: Allow arcount > 1 without OPT
265
266 Send a query to "allowtwoarnoopt.recordscount.tests.powerdns.com.",
267 check that we are getting a valid response.
268 """
269 name = 'allowtwoarnoopt.recordscount.tests.powerdns.com.'
270 query = dns.message.make_query(name, 'A', 'IN')
271 query.additional.append(dns.rrset.from_text(name,
272 3600,
273 dns.rdataclass.IN,
274 dns.rdatatype.A,
275 '127.0.0.1'))
276 query.additional.append(dns.rrset.from_text(name,
277 3600,
278 dns.rdataclass.IN,
279 dns.rdatatype.A,
280 '127.0.0.1'))
281
282 response = dns.message.make_response(query)
283 response.answer.append(dns.rrset.from_text(name,
284 3600,
285 dns.rdataclass.IN,
286 dns.rdatatype.A,
287 '127.0.0.1'))
288
289 for method in ("sendUDPQuery", "sendTCPQuery"):
290 sender = getattr(self, method)
291 (receivedQuery, receivedResponse) = sender(query, response)
292 self.assertTrue(receivedQuery)
293 self.assertTrue(receivedResponse)
294 receivedQuery.id = query.id
295 self.assertEquals(query, receivedQuery)
296 self.assertEquals(response, receivedResponse)