]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSOptions.py
3 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
7 class EDNSOptionsBase(DNSDistTest
):
8 _ednsTestFunction
= """
9 function testEDNSOptions(dq)
10 local options = dq:getEDNSOptions()
11 local qname = dq.qname:toString()
13 if string.match(qname, 'noedns') then
14 if next(options) ~= nil then
15 return DNSAction.Spoof, "192.0.2.255"
19 if string.match(qname, 'multiplecookies') then
20 if options[EDNSOptionCode.COOKIE] == nil then
21 return DNSAction.Spoof, "192.0.2.1"
23 if options[EDNSOptionCode.COOKIE]:count() ~= 2 then
24 return DNSAction.Spoof, "192.0.2.2"
26 if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
27 return DNSAction.Spoof, "192.0.2.3"
29 if options[EDNSOptionCode.COOKIE]:getValues()[2]:len() ~= 16 then
30 return DNSAction.Spoof, "192.0.2.4"
32 elseif string.match(qname, 'cookie') then
33 if options[EDNSOptionCode.COOKIE] == nil then
34 return DNSAction.Spoof, "192.0.2.1"
36 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
37 return DNSAction.Spoof, "192.0.2.2"
41 if string.match(qname, 'ecs4') then
42 if options[EDNSOptionCode.ECS] == nil then
43 return DNSAction.Spoof, "192.0.2.51"
45 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 8 then
46 return DNSAction.Spoof, "192.0.2.52"
50 if string.match(qname, 'ecs6') then
51 if options[EDNSOptionCode.ECS] == nil then
52 return DNSAction.Spoof, "192.0.2.101"
54 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 20 then
55 return DNSAction.Spoof, "192.0.2.102"
59 return DNSAction.None, ""
64 class TestEDNSOptions(EDNSOptionsBase
):
66 _config_template
= """
69 addLuaAction(AllRule(), testEDNSOptions)
71 newServer{address="127.0.0.1:%s"}
73 _config_params
= ['_ednsTestFunction', '_testServerPort']
75 def testWithoutEDNS(self
):
79 name
= 'noedns.ednsoptions.tests.powerdns.com.'
80 query
= dns
.message
.make_query(name
, 'A', 'IN')
81 response
= dns
.message
.make_response(query
)
82 rrset
= dns
.rrset
.from_text(name
,
87 response
.answer
.append(rrset
)
89 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
90 self
.assertTrue(receivedQuery
)
91 self
.assertTrue(receivedResponse
)
92 receivedQuery
.id = query
.id
93 self
.assertEquals(receivedQuery
, query
)
94 self
.assertEquals(receivedResponse
, response
)
96 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
97 self
.assertTrue(receivedQuery
)
98 self
.assertTrue(receivedResponse
)
99 receivedQuery
.id = query
.id
100 self
.assertEquals(receivedQuery
, query
)
101 self
.assertEquals(receivedResponse
, response
)
103 def testCookie(self
):
107 name
= 'cookie.ednsoptions.tests.powerdns.com.'
108 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
109 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
110 response
= dns
.message
.make_response(query
)
111 rrset
= dns
.rrset
.from_text(name
,
116 response
.answer
.append(rrset
)
118 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
119 self
.assertTrue(receivedQuery
)
120 self
.assertTrue(receivedResponse
)
121 receivedQuery
.id = query
.id
122 self
.assertEquals(receivedQuery
, query
)
123 self
.assertEquals(receivedResponse
, response
)
125 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
126 self
.assertTrue(receivedQuery
)
127 self
.assertTrue(receivedResponse
)
128 receivedQuery
.id = query
.id
129 self
.assertEquals(receivedQuery
, query
)
130 self
.assertEquals(receivedResponse
, response
)
136 name
= 'ecs4.ednsoptions.tests.powerdns.com.'
137 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
138 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
139 response
= dns
.message
.make_response(query
)
140 rrset
= dns
.rrset
.from_text(name
,
145 response
.answer
.append(rrset
)
147 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
148 self
.assertTrue(receivedQuery
)
149 self
.assertTrue(receivedResponse
)
150 receivedQuery
.id = query
.id
151 self
.assertEquals(receivedQuery
, query
)
152 self
.assertEquals(receivedResponse
, response
)
154 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
155 self
.assertTrue(receivedQuery
)
156 self
.assertTrue(receivedResponse
)
157 receivedQuery
.id = query
.id
158 self
.assertEquals(receivedQuery
, query
)
159 self
.assertEquals(receivedResponse
, response
)
165 name
= 'ecs6.ednsoptions.tests.powerdns.com.'
166 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
167 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
168 response
= dns
.message
.make_response(query
)
169 rrset
= dns
.rrset
.from_text(name
,
174 response
.answer
.append(rrset
)
176 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
177 self
.assertTrue(receivedQuery
)
178 self
.assertTrue(receivedResponse
)
179 receivedQuery
.id = query
.id
180 self
.assertEquals(receivedQuery
, query
)
181 self
.assertEquals(receivedResponse
, response
)
183 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
184 self
.assertTrue(receivedQuery
)
185 self
.assertTrue(receivedResponse
)
186 receivedQuery
.id = query
.id
187 self
.assertEquals(receivedQuery
, query
)
188 self
.assertEquals(receivedResponse
, response
)
190 def testECS6Cookie(self
):
192 EDNS Options: Cookie + ECS6
194 name
= 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
195 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
196 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
197 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
198 response
= dns
.message
.make_response(query
)
199 rrset
= dns
.rrset
.from_text(name
,
204 response
.answer
.append(rrset
)
206 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
207 self
.assertTrue(receivedQuery
)
208 self
.assertTrue(receivedResponse
)
209 receivedQuery
.id = query
.id
210 self
.assertEquals(receivedQuery
, query
)
211 self
.assertEquals(receivedResponse
, response
)
213 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
214 self
.assertTrue(receivedQuery
)
215 self
.assertTrue(receivedResponse
)
216 receivedQuery
.id = query
.id
217 self
.assertEquals(receivedQuery
, query
)
218 self
.assertEquals(receivedResponse
, response
)
220 def testMultiCookiesECS6(self
):
222 EDNS Options: Two Cookies + ECS6
224 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
225 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
226 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
227 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
228 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
229 response
= dns
.message
.make_response(query
)
230 rrset
= dns
.rrset
.from_text(name
,
235 response
.answer
.append(rrset
)
237 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
238 self
.assertTrue(receivedQuery
)
239 self
.assertTrue(receivedResponse
)
240 receivedQuery
.id = query
.id
241 self
.assertEquals(receivedQuery
, query
)
242 self
.assertEquals(receivedResponse
, response
)
244 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
245 self
.assertTrue(receivedQuery
)
246 self
.assertTrue(receivedResponse
)
247 receivedQuery
.id = query
.id
248 self
.assertEquals(receivedQuery
, query
)
249 self
.assertEquals(receivedResponse
, response
)
251 class TestEDNSOptionsAddingECS(EDNSOptionsBase
):
253 _config_template
= """
256 addLuaAction(AllRule(), testEDNSOptions)
258 newServer{address="127.0.0.1:%s", useClientSubnet=true}
260 _config_params
= ['_ednsTestFunction', '_testServerPort']
262 def testWithoutEDNS(self
):
264 EDNS Options: No EDNS (adding ECS)
266 name
= 'noedns.ednsoptions-ecs.tests.powerdns.com.'
267 query
= dns
.message
.make_query(name
, 'A', 'IN')
268 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
269 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[ecso
], payload
=512)
270 response
= dns
.message
.make_response(query
)
271 rrset
= dns
.rrset
.from_text(name
,
276 response
.answer
.append(rrset
)
278 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
279 self
.assertTrue(receivedQuery
)
280 self
.assertTrue(receivedResponse
)
281 receivedQuery
.id = expectedQuery
.id
282 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
283 self
.checkResponseNoEDNS(response
, receivedResponse
)
285 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
286 self
.assertTrue(receivedQuery
)
287 self
.assertTrue(receivedResponse
)
288 receivedQuery
.id = expectedQuery
.id
289 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
290 self
.checkResponseNoEDNS(response
, receivedResponse
)
292 def testCookie(self
):
294 EDNS Options: Cookie (adding ECS)
296 name
= 'cookie.ednsoptions-ecs.tests.powerdns.com.'
297 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
298 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco
])
299 ecso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
300 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, options
=[eco
,ecso
], payload
=512)
301 response
= dns
.message
.make_response(query
)
302 rrset
= dns
.rrset
.from_text(name
,
307 response
.answer
.append(rrset
)
309 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
310 self
.assertTrue(receivedQuery
)
311 self
.assertTrue(receivedResponse
)
312 receivedQuery
.id = expectedQuery
.id
313 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 1)
314 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
316 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
317 self
.assertTrue(receivedQuery
)
318 self
.assertTrue(receivedResponse
)
319 receivedQuery
.id = expectedQuery
.id
320 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
, 1)
321 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
325 EDNS Options: ECS4 (adding ECS)
327 name
= 'ecs4.ednsoptions-ecs.tests.powerdns.com.'
328 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 32)
329 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
330 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('1.2.3.4', 24, scope
=24)
331 response
= dns
.message
.make_response(query
)
332 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
333 rrset
= dns
.rrset
.from_text(name
,
338 response
.answer
.append(rrset
)
340 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
341 self
.assertTrue(receivedQuery
)
342 self
.assertTrue(receivedResponse
)
343 receivedQuery
.id = query
.id
344 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
345 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
347 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
348 self
.assertTrue(receivedQuery
)
349 self
.assertTrue(receivedResponse
)
350 receivedQuery
.id = query
.id
351 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
352 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
356 EDNS Options: ECS6 (adding ECS)
358 name
= 'ecs6.ednsoptions-ecs.tests.powerdns.com.'
359 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
360 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
])
361 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128, scope
=56)
362 response
= dns
.message
.make_response(query
)
363 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
364 rrset
= dns
.rrset
.from_text(name
,
369 response
.answer
.append(rrset
)
371 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
372 self
.assertTrue(receivedQuery
)
373 self
.assertTrue(receivedResponse
)
374 receivedQuery
.id = query
.id
375 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
376 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
378 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
379 self
.assertTrue(receivedQuery
)
380 self
.assertTrue(receivedResponse
)
381 receivedQuery
.id = query
.id
382 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
383 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
385 def testECS6Cookie(self
):
387 EDNS Options: Cookie + ECS6 (adding ECS)
389 name
= 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.'
390 eco
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
391 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
392 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[ecso
,eco
])
393 ecsoResponse
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128, scope
=56)
394 response
= dns
.message
.make_response(query
)
395 response
.use_edns(edns
=True, payload
=4096, options
=[ecsoResponse
])
396 rrset
= dns
.rrset
.from_text(name
,
401 response
.answer
.append(rrset
)
403 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
404 self
.assertTrue(receivedQuery
)
405 self
.assertTrue(receivedResponse
)
406 receivedQuery
.id = query
.id
407 self
.checkQueryEDNSWithECS(query
, receivedQuery
, 1)
408 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
410 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
411 self
.assertTrue(receivedQuery
)
412 self
.assertTrue(receivedResponse
)
413 receivedQuery
.id = query
.id
414 self
.checkQueryEDNSWithECS(query
, receivedQuery
, 1)
415 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
417 def testMultiCookiesECS6(self
):
419 EDNS Options: Two Cookies + ECS6
421 name
= 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
422 eco1
= cookiesoption
.CookiesOption(b
'deadbeef', b
'deadbeef')
423 ecso
= clientsubnetoption
.ClientSubnetOption('2001:DB8::1', 128)
424 eco2
= cookiesoption
.CookiesOption(b
'deadc0de', b
'deadc0de')
425 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[eco1
, ecso
, eco2
])
426 response
= dns
.message
.make_response(query
)
427 rrset
= dns
.rrset
.from_text(name
,
432 response
.answer
.append(rrset
)
434 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
435 self
.assertTrue(receivedQuery
)
436 self
.assertTrue(receivedResponse
)
437 receivedQuery
.id = query
.id
438 self
.assertEquals(receivedQuery
, query
)
439 self
.assertEquals(receivedResponse
, response
)
441 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
442 self
.assertTrue(receivedQuery
)
443 self
.assertTrue(receivedResponse
)
444 receivedQuery
.id = query
.id
445 self
.assertEquals(receivedQuery
, query
)
446 self
.assertEquals(receivedResponse
, response
)