]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSOptions.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class EDNSOptionsBase(DNSDistTest
):
8 _ednsTestFunction
= """
9 function testEDNSOptions(dq)
10 local options = dq:getEDNSOptions()
11 local qname = dq.qname:toString()
13 if string.match(qname, 'noedns') then
14 if next(options) ~= nil then
15 return DNSAction.Spoof, "192.0.2.255"
19 if string.match(qname, 'multiplecookies') then
20 if options[EDNSOptionCode.COOKIE] == nil then
21 return DNSAction.Spoof, "192.0.2.1"
23 if options[EDNSOptionCode.COOKIE]:count() ~= 2 then
24 return DNSAction.Spoof, "192.0.2.2"
26 if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
27 return DNSAction.Spoof, "192.0.2.3"
29 if options[EDNSOptionCode.COOKIE]:getValues()[2]:len() ~= 16 then
30 return DNSAction.Spoof, "192.0.2.4"
32 elseif string.match(qname, 'cookie') then
33 if options[EDNSOptionCode.COOKIE] == nil then
34 return DNSAction.Spoof, "192.0.2.1"
36 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
37 return DNSAction.Spoof, "192.0.2.2"
41 if string.match(qname, 'ecs4') then
42 if options[EDNSOptionCode.ECS] == nil then
43 return DNSAction.Spoof, "192.0.2.51"
45 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 8 then
46 return DNSAction.Spoof, "192.0.2.52"
50 if string.match(qname, 'ecs6') then
51 if options[EDNSOptionCode.ECS] == nil then
52 return DNSAction.Spoof, "192.0.2.101"
54 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 20 then
55 return DNSAction.Spoof, "192.0.2.102"
59 return DNSAction.None, ""
64 class TestEDNSOptions(EDNSOptionsBase
):
66 _config_template
= """
69 addAction(AllRule(), LuaAction(testEDNSOptions))
71 newServer{address="127.0.0.1:%s"}
73 _config_params
= ['_ednsTestFunction', '_testServerPort']
75 def testWithoutEDNS(self
):
79 name
= 'noedns.ednsoptions.tests.powerdns.com.'
80 query
= dns
.message
.make_query(name
, 'A', 'IN')
81 response
= dns
.message
.make_response(query
)
82 rrset
= dns
.rrset
.from_text(name
,
87 response
.answer
.append(rrset
)
89 for method
in ("sendUDPQuery", "sendTCPQuery"):
90 sender
= getattr(self
, method
)
91 (receivedQuery
, receivedResponse
) = sender(query
, response
)
92 self
.assertTrue(receivedQuery
)
93 self
.assertTrue(receivedResponse
)
94 receivedQuery
.id = query
.id
95 self
.assertEquals(receivedQuery
, query
)
96 self
.assertEquals(receivedResponse
, response
)
102 name
= 'cookie.ednsoptions.tests.powerdns.com.'
103 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
104 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
105 response
= dns
.message
.make_response(query
)
106 rrset
= dns
.rrset
.from_text(name
,
111 response
.answer
.append(rrset
)
113 for method
in ("sendUDPQuery", "sendTCPQuery"):
114 sender
= getattr(self
, method
)
115 (receivedQuery
, receivedResponse
) = sender(query
, response
)
116 self
.assertTrue(receivedQuery
)
117 self
.assertTrue(receivedResponse
)
118 receivedQuery
.id = query
.id
119 self
.assertEquals(receivedQuery
, query
)
120 self
.assertEquals(receivedResponse
, response
)
126 name
= 'ecs4.ednsoptions.tests.powerdns.com.'
127 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
128 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
129 response
= dns
.message
.make_response(query
)
130 rrset
= dns
.rrset
.from_text(name
,
135 response
.answer
.append(rrset
)
137 for method
in ("sendUDPQuery", "sendTCPQuery"):
138 sender
= getattr(self
, method
)
139 (receivedQuery
, receivedResponse
) = sender(query
, response
)
140 self
.assertTrue(receivedQuery
)
141 self
.assertTrue(receivedResponse
)
142 receivedQuery
.id = query
.id
143 self
.assertEquals(receivedQuery
, query
)
144 self
.assertEquals(receivedResponse
, response
)
150 name
= 'ecs6.ednsoptions.tests.powerdns.com.'
151 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
152 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
153 response
= dns
.message
.make_response(query
)
154 rrset
= dns
.rrset
.from_text(name
,
159 response
.answer
.append(rrset
)
161 for method
in ("sendUDPQuery", "sendTCPQuery"):
162 sender
= getattr(self
, method
)
163 (receivedQuery
, receivedResponse
) = sender(query
, response
)
164 self
.assertTrue(receivedQuery
)
165 self
.assertTrue(receivedResponse
)
166 receivedQuery
.id = query
.id
167 self
.assertEquals(receivedQuery
, query
)
168 self
.assertEquals(receivedResponse
, response
)
170 def testECS6Cookie(self
):
172 EDNS Options: Cookie + ECS6
174 name
= 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
175 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
176 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
177 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
178 response
= dns
.message
.make_response(query
)
179 rrset
= dns
.rrset
.from_text(name
,
184 response
.answer
.append(rrset
)
186 for method
in ("sendUDPQuery", "sendTCPQuery"):
187 sender
= getattr(self
, method
)
188 (receivedQuery
, receivedResponse
) = sender(query
, response
)
189 self
.assertTrue(receivedQuery
)
190 self
.assertTrue(receivedResponse
)
191 receivedQuery
.id = query
.id
192 self
.assertEquals(receivedQuery
, query
)
193 self
.assertEquals(receivedResponse
, response
)
195 def testMultiCookiesECS6(self
):
197 EDNS Options: Two Cookies + ECS6
199 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
200 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
201 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
202 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
203 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
204 response
= dns
.message
.make_response(query
)
205 rrset
= dns
.rrset
.from_text(name
,
210 response
.answer
.append(rrset
)
212 for method
in ("sendUDPQuery", "sendTCPQuery"):
213 sender
= getattr(self
, method
)
214 (receivedQuery
, receivedResponse
) = sender(query
, response
)
215 self
.assertTrue(receivedQuery
)
216 self
.assertTrue(receivedResponse
)
217 receivedQuery
.id = query
.id
218 self
.assertEquals(receivedQuery
, query
)
219 self
.assertEquals(receivedResponse
, response
)
221 class TestEDNSOptionsAddingECS(EDNSOptionsBase
):
223 _config_template
= """
226 addAction(AllRule(), LuaAction(testEDNSOptions))
228 newServer{address="127.0.0.1:%s", useClientSubnet=true}
230 _config_params
= ['_ednsTestFunction', '_testServerPort']
232 def testWithoutEDNS(self
):
234 EDNS Options: No EDNS (adding ECS)
236 name
= 'noedns.ednsoptions-ecs.tests.powerdns.com.'
237 query
= dns
.message
.make_query(name
, 'A', 'IN')
238 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
239 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
240 response
= dns
.message
.make_response(query
)
241 rrset
= dns
.rrset
.from_text(name
,
246 response
.answer
.append(rrset
)
248 for method
in ("sendUDPQuery", "sendTCPQuery"):
249 sender
= getattr(self
, method
)
250 (receivedQuery
, receivedResponse
) = sender(query
, response
)
251 self
.assertTrue(receivedQuery
)
252 self
.assertTrue(receivedResponse
)
253 receivedQuery
.id = expectedQuery
.id
254 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
255 self
.checkResponseNoEDNS(response
, receivedResponse
)
257 def testCookie(self
):
259 EDNS Options: Cookie (adding ECS)
261 name
= 'cookie.ednsoptions-ecs.tests.powerdns.com.'
262 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
263 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
264 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
265 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[eco
,ecso
], payload
=512)
266 response
= dns
.message
.make_response(query
)
267 rrset
= dns
.rrset
.from_text(name
,
272 response
.answer
.append(rrset
)
274 for method
in ("sendUDPQuery", "sendTCPQuery"):
275 sender
= getattr(self
, method
)
276 (receivedQuery
, receivedResponse
) = sender(query
, response
)
277 self
.assertTrue(receivedQuery
)
278 self
.assertTrue(receivedResponse
)
279 receivedQuery
.id = expectedQuery
.id
280 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 1)
281 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
285 EDNS Options: ECS4 (adding ECS)
287 name
= 'ecs4.ednsoptions-ecs.tests.powerdns.com.'
288 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
289 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
290 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24, scope
=24)
291 response
= dns
.message
.make_response(query
)
292 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
293 rrset
= dns
.rrset
.from_text(name
,
298 response
.answer
.append(rrset
)
300 for method
in ("sendUDPQuery", "sendTCPQuery"):
301 sender
= getattr(self
, method
)
302 (receivedQuery
, receivedResponse
) = sender(query
, response
)
303 self
.assertTrue(receivedQuery
)
304 self
.assertTrue(receivedResponse
)
305 receivedQuery
.id = query
.id
306 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
307 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
311 EDNS Options: ECS6 (adding ECS)
313 name
= 'ecs6.ednsoptions-ecs.tests.powerdns.com.'
314 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
315 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
316 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128, scope
=56)
317 response
= dns
.message
.make_response(query
)
318 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
319 rrset
= dns
.rrset
.from_text(name
,
324 response
.answer
.append(rrset
)
326 for method
in ("sendUDPQuery", "sendTCPQuery"):
327 sender
= getattr(self
, method
)
328 (receivedQuery
, receivedResponse
) = sender(query
, response
)
329 self
.assertTrue(receivedQuery
)
330 self
.assertTrue(receivedResponse
)
331 receivedQuery
.id = query
.id
332 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
333 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
335 def testECS6Cookie(self
):
337 EDNS Options: Cookie + ECS6 (adding ECS)
339 name
= 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.'
340 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
341 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
342 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
343 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128, scope
=56)
344 response
= dns
.message
.make_response(query
)
345 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
346 rrset
= dns
.rrset
.from_text(name
,
351 response
.answer
.append(rrset
)
353 for method
in ("sendUDPQuery", "sendTCPQuery"):
354 sender
= getattr(self
, method
)
355 (receivedQuery
, receivedResponse
) = sender(query
, response
)
356 self
.assertTrue(receivedQuery
)
357 self
.assertTrue(receivedResponse
)
358 receivedQuery
.id = query
.id
359 self
.checkQueryEDNSWithECS(query
, receivedQuery
, 1)
360 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
362 def testMultiCookiesECS6(self
):
364 EDNS Options: Two Cookies + ECS6
366 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
367 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
368 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
369 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
370 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
371 response
= dns
.message
.make_response(query
)
372 rrset
= dns
.rrset
.from_text(name
,
377 response
.answer
.append(rrset
)
379 for method
in ("sendUDPQuery", "sendTCPQuery"):
380 sender
= getattr(self
, method
)
381 (receivedQuery
, receivedResponse
) = sender(query
, response
)
382 self
.assertTrue(receivedQuery
)
383 self
.assertTrue(receivedResponse
)
384 receivedQuery
.id = query
.id
385 self
.assertEquals(receivedQuery
, query
)
386 self
.assertEquals(receivedResponse
, response
)