]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_EDNSOptions.py
Merge pull request #8804 from rgacogne/ddist-install-config-file
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSOptions.py
CommitLineData
cbf4e13a
RG
1#!/usr/bin/env python
2import dns
3import clientsubnetoption
4import cookiesoption
5from dnsdisttests import DNSDistTest
6
7class EDNSOptionsBase(DNSDistTest):
8 _ednsTestFunction = """
9 function testEDNSOptions(dq)
10 local options = dq:getEDNSOptions()
11 local qname = dq.qname:toString()
12
13 if string.match(qname, 'noedns') then
14 if next(options) ~= nil then
15 return DNSAction.Spoof, "192.0.2.255"
16 end
17 end
18
19 if string.match(qname, 'multiplecookies') then
20 if options[EDNSOptionCode.COOKIE] == nil then
21 return DNSAction.Spoof, "192.0.2.1"
22 end
23 if options[EDNSOptionCode.COOKIE]:count() ~= 2 then
24 return DNSAction.Spoof, "192.0.2.2"
25 end
63101b75 26 if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
cbf4e13a
RG
27 return DNSAction.Spoof, "192.0.2.3"
28 end
63101b75 29 if options[EDNSOptionCode.COOKIE]:getValues()[2]:len() ~= 16 then
cbf4e13a
RG
30 return DNSAction.Spoof, "192.0.2.4"
31 end
32 elseif string.match(qname, 'cookie') then
33 if options[EDNSOptionCode.COOKIE] == nil then
34 return DNSAction.Spoof, "192.0.2.1"
35 end
63101b75 36 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
cbf4e13a
RG
37 return DNSAction.Spoof, "192.0.2.2"
38 end
39 end
40
41 if string.match(qname, 'ecs4') then
42 if options[EDNSOptionCode.ECS] == nil then
43 return DNSAction.Spoof, "192.0.2.51"
44 end
63101b75 45 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 8 then
cbf4e13a
RG
46 return DNSAction.Spoof, "192.0.2.52"
47 end
48 end
49
50 if string.match(qname, 'ecs6') then
51 if options[EDNSOptionCode.ECS] == nil then
52 return DNSAction.Spoof, "192.0.2.101"
53 end
63101b75 54 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 20 then
cbf4e13a
RG
55 return DNSAction.Spoof, "192.0.2.102"
56 end
57 end
58
59 return DNSAction.None, ""
60
61 end
62 """
63
64class TestEDNSOptions(EDNSOptionsBase):
65
66 _config_template = """
67 %s
68
955de53b 69 addAction(AllRule(), LuaAction(testEDNSOptions))
cbf4e13a
RG
70
71 newServer{address="127.0.0.1:%s"}
72 """
73 _config_params = ['_ednsTestFunction', '_testServerPort']
74
75 def testWithoutEDNS(self):
76 """
77 EDNS Options: No EDNS
78 """
79 name = 'noedns.ednsoptions.tests.powerdns.com.'
80 query = dns.message.make_query(name, 'A', 'IN')
81 response = dns.message.make_response(query)
82 rrset = dns.rrset.from_text(name,
83 3600,
84 dns.rdataclass.IN,
85 dns.rdatatype.A,
86 '192.0.2.255')
87 response.answer.append(rrset)
88
6ca2e796
RG
89 for method in ("sendUDPQuery", "sendTCPQuery"):
90 sender = getattr(self, method)
91 (receivedQuery, receivedResponse) = sender(query, response)
92 self.assertTrue(receivedQuery)
93 self.assertTrue(receivedResponse)
94 receivedQuery.id = query.id
95 self.assertEquals(receivedQuery, query)
96 self.assertEquals(receivedResponse, response)
cbf4e13a
RG
97
98 def testCookie(self):
99 """
100 EDNS Options: Cookie
101 """
102 name = 'cookie.ednsoptions.tests.powerdns.com.'
06b0e003 103 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
cbf4e13a
RG
104 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
105 response = dns.message.make_response(query)
106 rrset = dns.rrset.from_text(name,
107 3600,
108 dns.rdataclass.IN,
109 dns.rdatatype.A,
110 '127.0.0.1')
111 response.answer.append(rrset)
112
6ca2e796
RG
113 for method in ("sendUDPQuery", "sendTCPQuery"):
114 sender = getattr(self, method)
115 (receivedQuery, receivedResponse) = sender(query, response)
116 self.assertTrue(receivedQuery)
117 self.assertTrue(receivedResponse)
118 receivedQuery.id = query.id
119 self.assertEquals(receivedQuery, query)
120 self.assertEquals(receivedResponse, response)
cbf4e13a
RG
121
122 def testECS4(self):
123 """
124 EDNS Options: ECS4
125 """
126 name = 'ecs4.ednsoptions.tests.powerdns.com.'
127 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
128 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
129 response = dns.message.make_response(query)
130 rrset = dns.rrset.from_text(name,
131 3600,
132 dns.rdataclass.IN,
133 dns.rdatatype.A,
134 '127.0.0.1')
135 response.answer.append(rrset)
136
6ca2e796
RG
137 for method in ("sendUDPQuery", "sendTCPQuery"):
138 sender = getattr(self, method)
139 (receivedQuery, receivedResponse) = sender(query, response)
140 self.assertTrue(receivedQuery)
141 self.assertTrue(receivedResponse)
142 receivedQuery.id = query.id
143 self.assertEquals(receivedQuery, query)
144 self.assertEquals(receivedResponse, response)
cbf4e13a
RG
145
146 def testECS6(self):
147 """
148 EDNS Options: ECS6
149 """
150 name = 'ecs6.ednsoptions.tests.powerdns.com.'
151 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
152 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
153 response = dns.message.make_response(query)
154 rrset = dns.rrset.from_text(name,
155 3600,
156 dns.rdataclass.IN,
157 dns.rdatatype.A,
158 '127.0.0.1')
159 response.answer.append(rrset)
160
6ca2e796
RG
161 for method in ("sendUDPQuery", "sendTCPQuery"):
162 sender = getattr(self, method)
163 (receivedQuery, receivedResponse) = sender(query, response)
164 self.assertTrue(receivedQuery)
165 self.assertTrue(receivedResponse)
166 receivedQuery.id = query.id
167 self.assertEquals(receivedQuery, query)
168 self.assertEquals(receivedResponse, response)
cbf4e13a
RG
169
170 def testECS6Cookie(self):
171 """
172 EDNS Options: Cookie + ECS6
173 """
174 name = 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
06b0e003 175 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
cbf4e13a
RG
176 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
177 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
178 response = dns.message.make_response(query)
179 rrset = dns.rrset.from_text(name,
180 3600,
181 dns.rdataclass.IN,
182 dns.rdatatype.A,
183 '127.0.0.1')
184 response.answer.append(rrset)
185
6ca2e796
RG
186 for method in ("sendUDPQuery", "sendTCPQuery"):
187 sender = getattr(self, method)
188 (receivedQuery, receivedResponse) = sender(query, response)
189 self.assertTrue(receivedQuery)
190 self.assertTrue(receivedResponse)
191 receivedQuery.id = query.id
192 self.assertEquals(receivedQuery, query)
193 self.assertEquals(receivedResponse, response)
cbf4e13a
RG
194
195 def testMultiCookiesECS6(self):
196 """
197 EDNS Options: Two Cookies + ECS6
198 """
199 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
06b0e003 200 eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
cbf4e13a 201 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
06b0e003 202 eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
cbf4e13a
RG
203 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
204 response = dns.message.make_response(query)
205 rrset = dns.rrset.from_text(name,
206 3600,
207 dns.rdataclass.IN,
208 dns.rdatatype.A,
209 '127.0.0.1')
210 response.answer.append(rrset)
211
6ca2e796
RG
212 for method in ("sendUDPQuery", "sendTCPQuery"):
213 sender = getattr(self, method)
214 (receivedQuery, receivedResponse) = sender(query, response)
215 self.assertTrue(receivedQuery)
216 self.assertTrue(receivedResponse)
217 receivedQuery.id = query.id
218 self.assertEquals(receivedQuery, query)
219 self.assertEquals(receivedResponse, response)
cbf4e13a
RG
220
221class TestEDNSOptionsAddingECS(EDNSOptionsBase):
222
223 _config_template = """
224 %s
225
955de53b 226 addAction(AllRule(), LuaAction(testEDNSOptions))
cbf4e13a
RG
227
228 newServer{address="127.0.0.1:%s", useClientSubnet=true}
229 """
230 _config_params = ['_ednsTestFunction', '_testServerPort']
231
232 def testWithoutEDNS(self):
233 """
234 EDNS Options: No EDNS (adding ECS)
235 """
236 name = 'noedns.ednsoptions-ecs.tests.powerdns.com.'
237 query = dns.message.make_query(name, 'A', 'IN')
238 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
239 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
240 response = dns.message.make_response(query)
241 rrset = dns.rrset.from_text(name,
242 3600,
243 dns.rdataclass.IN,
244 dns.rdatatype.A,
245 '127.0.0.1')
246 response.answer.append(rrset)
247
6ca2e796
RG
248 for method in ("sendUDPQuery", "sendTCPQuery"):
249 sender = getattr(self, method)
250 (receivedQuery, receivedResponse) = sender(query, response)
251 self.assertTrue(receivedQuery)
252 self.assertTrue(receivedResponse)
253 receivedQuery.id = expectedQuery.id
254 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
255 self.checkResponseNoEDNS(response, receivedResponse)
cbf4e13a
RG
256
257 def testCookie(self):
258 """
259 EDNS Options: Cookie (adding ECS)
260 """
261 name = 'cookie.ednsoptions-ecs.tests.powerdns.com.'
06b0e003 262 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
d70d5ad3 263 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[eco])
cbf4e13a
RG
264 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
265 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[eco,ecso], payload=512)
266 response = dns.message.make_response(query)
267 rrset = dns.rrset.from_text(name,
268 3600,
269 dns.rdataclass.IN,
270 dns.rdatatype.A,
271 '127.0.0.1')
272 response.answer.append(rrset)
273
6ca2e796
RG
274 for method in ("sendUDPQuery", "sendTCPQuery"):
275 sender = getattr(self, method)
276 (receivedQuery, receivedResponse) = sender(query, response)
277 self.assertTrue(receivedQuery)
278 self.assertTrue(receivedResponse)
279 receivedQuery.id = expectedQuery.id
280 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
281 self.checkResponseEDNSWithoutECS(response, receivedResponse)
cbf4e13a
RG
282
283 def testECS4(self):
284 """
285 EDNS Options: ECS4 (adding ECS)
286 """
287 name = 'ecs4.ednsoptions-ecs.tests.powerdns.com.'
288 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
289 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
290 ecsoResponse = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24, scope=24)
291 response = dns.message.make_response(query)
292 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
293 rrset = dns.rrset.from_text(name,
294 3600,
295 dns.rdataclass.IN,
296 dns.rdatatype.A,
297 '127.0.0.1')
298 response.answer.append(rrset)
299
6ca2e796
RG
300 for method in ("sendUDPQuery", "sendTCPQuery"):
301 sender = getattr(self, method)
302 (receivedQuery, receivedResponse) = sender(query, response)
303 self.assertTrue(receivedQuery)
304 self.assertTrue(receivedResponse)
305 receivedQuery.id = query.id
306 self.checkQueryEDNSWithECS(query, receivedQuery)
307 self.checkResponseEDNSWithECS(response, receivedResponse)
cbf4e13a
RG
308
309 def testECS6(self):
310 """
311 EDNS Options: ECS6 (adding ECS)
312 """
313 name = 'ecs6.ednsoptions-ecs.tests.powerdns.com.'
314 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
315 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
316 ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56)
317 response = dns.message.make_response(query)
318 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
319 rrset = dns.rrset.from_text(name,
320 3600,
321 dns.rdataclass.IN,
322 dns.rdatatype.A,
323 '127.0.0.1')
324 response.answer.append(rrset)
325
6ca2e796
RG
326 for method in ("sendUDPQuery", "sendTCPQuery"):
327 sender = getattr(self, method)
328 (receivedQuery, receivedResponse) = sender(query, response)
329 self.assertTrue(receivedQuery)
330 self.assertTrue(receivedResponse)
331 receivedQuery.id = query.id
332 self.checkQueryEDNSWithECS(query, receivedQuery)
333 self.checkResponseEDNSWithECS(response, receivedResponse)
cbf4e13a
RG
334
335 def testECS6Cookie(self):
336 """
337 EDNS Options: Cookie + ECS6 (adding ECS)
338 """
339 name = 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.'
06b0e003 340 eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
cbf4e13a
RG
341 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
342 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
343 ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56)
344 response = dns.message.make_response(query)
345 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
346 rrset = dns.rrset.from_text(name,
347 3600,
348 dns.rdataclass.IN,
349 dns.rdatatype.A,
350 '127.0.0.1')
351 response.answer.append(rrset)
352
6ca2e796
RG
353 for method in ("sendUDPQuery", "sendTCPQuery"):
354 sender = getattr(self, method)
355 (receivedQuery, receivedResponse) = sender(query, response)
356 self.assertTrue(receivedQuery)
357 self.assertTrue(receivedResponse)
358 receivedQuery.id = query.id
359 self.checkQueryEDNSWithECS(query, receivedQuery, 1)
360 self.checkResponseEDNSWithECS(response, receivedResponse)
cbf4e13a
RG
361
362 def testMultiCookiesECS6(self):
363 """
364 EDNS Options: Two Cookies + ECS6
365 """
366 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
06b0e003 367 eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
cbf4e13a 368 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
06b0e003 369 eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
cbf4e13a
RG
370 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
371 response = dns.message.make_response(query)
372 rrset = dns.rrset.from_text(name,
373 3600,
374 dns.rdataclass.IN,
375 dns.rdatatype.A,
376 '127.0.0.1')
377 response.answer.append(rrset)
378
6ca2e796
RG
379 for method in ("sendUDPQuery", "sendTCPQuery"):
380 sender = getattr(self, method)
381 (receivedQuery, receivedResponse) = sender(query, response)
382 self.assertTrue(receivedQuery)
383 self.assertTrue(receivedResponse)
384 receivedQuery.id = query.id
385 self.assertEquals(receivedQuery, query)
386 self.assertEquals(receivedResponse, response)