]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_EDNSOptions.py
Merge pull request #7053 from mind04/doc-warnings
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSOptions.py
1 #!/usr/bin/env python
2 import dns
3 import clientsubnetoption
4 import cookiesoption
5 from dnsdisttests import DNSDistTest
6
7 class EDNSOptionsBase(DNSDistTest):
8 _ednsTestFunction = """
9 function testEDNSOptions(dq)
10 local options = dq:getEDNSOptions()
11 local qname = dq.qname:toString()
12
13 if string.match(qname, 'noedns') then
14 if next(options) ~= nil then
15 return DNSAction.Spoof, "192.0.2.255"
16 end
17 end
18
19 if string.match(qname, 'multiplecookies') then
20 if options[EDNSOptionCode.COOKIE] == nil then
21 return DNSAction.Spoof, "192.0.2.1"
22 end
23 if options[EDNSOptionCode.COOKIE]:count() ~= 2 then
24 return DNSAction.Spoof, "192.0.2.2"
25 end
26 if options[EDNSOptionCode.COOKIE]:getValues()[0]:len() ~= 16 then
27 return DNSAction.Spoof, "192.0.2.3"
28 end
29 if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
30 return DNSAction.Spoof, "192.0.2.4"
31 end
32 elseif string.match(qname, 'cookie') then
33 if options[EDNSOptionCode.COOKIE] == nil then
34 return DNSAction.Spoof, "192.0.2.1"
35 end
36 if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[0]:len() ~= 16 then
37 return DNSAction.Spoof, "192.0.2.2"
38 end
39 end
40
41 if string.match(qname, 'ecs4') then
42 if options[EDNSOptionCode.ECS] == nil then
43 return DNSAction.Spoof, "192.0.2.51"
44 end
45 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[0]:len() ~= 8 then
46 return DNSAction.Spoof, "192.0.2.52"
47 end
48 end
49
50 if string.match(qname, 'ecs6') then
51 if options[EDNSOptionCode.ECS] == nil then
52 return DNSAction.Spoof, "192.0.2.101"
53 end
54 if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[0]:len() ~= 20 then
55 return DNSAction.Spoof, "192.0.2.102"
56 end
57 end
58
59 return DNSAction.None, ""
60
61 end
62 """
63
64 class TestEDNSOptions(EDNSOptionsBase):
65
66 _config_template = """
67 %s
68
69 addLuaAction(AllRule(), testEDNSOptions)
70
71 newServer{address="127.0.0.1:%s"}
72 """
73 _config_params = ['_ednsTestFunction', '_testServerPort']
74
75 def testWithoutEDNS(self):
76 """
77 EDNS Options: No EDNS
78 """
79 name = 'noedns.ednsoptions.tests.powerdns.com.'
80 query = dns.message.make_query(name, 'A', 'IN')
81 response = dns.message.make_response(query)
82 rrset = dns.rrset.from_text(name,
83 3600,
84 dns.rdataclass.IN,
85 dns.rdatatype.A,
86 '192.0.2.255')
87 response.answer.append(rrset)
88
89 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
90 self.assertTrue(receivedQuery)
91 self.assertTrue(receivedResponse)
92 receivedQuery.id = query.id
93 self.assertEquals(receivedQuery, query)
94 self.assertEquals(receivedResponse, response)
95
96 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
97 self.assertTrue(receivedQuery)
98 self.assertTrue(receivedResponse)
99 receivedQuery.id = query.id
100 self.assertEquals(receivedQuery, query)
101 self.assertEquals(receivedResponse, response)
102
103 def testCookie(self):
104 """
105 EDNS Options: Cookie
106 """
107 name = 'cookie.ednsoptions.tests.powerdns.com.'
108 eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
109 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
110 response = dns.message.make_response(query)
111 rrset = dns.rrset.from_text(name,
112 3600,
113 dns.rdataclass.IN,
114 dns.rdatatype.A,
115 '127.0.0.1')
116 response.answer.append(rrset)
117
118 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
119 self.assertTrue(receivedQuery)
120 self.assertTrue(receivedResponse)
121 receivedQuery.id = query.id
122 self.assertEquals(receivedQuery, query)
123 self.assertEquals(receivedResponse, response)
124
125 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
126 self.assertTrue(receivedQuery)
127 self.assertTrue(receivedResponse)
128 receivedQuery.id = query.id
129 self.assertEquals(receivedQuery, query)
130 self.assertEquals(receivedResponse, response)
131
132 def testECS4(self):
133 """
134 EDNS Options: ECS4
135 """
136 name = 'ecs4.ednsoptions.tests.powerdns.com.'
137 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
138 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
139 response = dns.message.make_response(query)
140 rrset = dns.rrset.from_text(name,
141 3600,
142 dns.rdataclass.IN,
143 dns.rdatatype.A,
144 '127.0.0.1')
145 response.answer.append(rrset)
146
147 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
148 self.assertTrue(receivedQuery)
149 self.assertTrue(receivedResponse)
150 receivedQuery.id = query.id
151 self.assertEquals(receivedQuery, query)
152 self.assertEquals(receivedResponse, response)
153
154 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
155 self.assertTrue(receivedQuery)
156 self.assertTrue(receivedResponse)
157 receivedQuery.id = query.id
158 self.assertEquals(receivedQuery, query)
159 self.assertEquals(receivedResponse, response)
160
161 def testECS6(self):
162 """
163 EDNS Options: ECS6
164 """
165 name = 'ecs6.ednsoptions.tests.powerdns.com.'
166 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
167 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
168 response = dns.message.make_response(query)
169 rrset = dns.rrset.from_text(name,
170 3600,
171 dns.rdataclass.IN,
172 dns.rdatatype.A,
173 '127.0.0.1')
174 response.answer.append(rrset)
175
176 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
177 self.assertTrue(receivedQuery)
178 self.assertTrue(receivedResponse)
179 receivedQuery.id = query.id
180 self.assertEquals(receivedQuery, query)
181 self.assertEquals(receivedResponse, response)
182
183 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
184 self.assertTrue(receivedQuery)
185 self.assertTrue(receivedResponse)
186 receivedQuery.id = query.id
187 self.assertEquals(receivedQuery, query)
188 self.assertEquals(receivedResponse, response)
189
190 def testECS6Cookie(self):
191 """
192 EDNS Options: Cookie + ECS6
193 """
194 name = 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
195 eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
196 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
197 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
198 response = dns.message.make_response(query)
199 rrset = dns.rrset.from_text(name,
200 3600,
201 dns.rdataclass.IN,
202 dns.rdatatype.A,
203 '127.0.0.1')
204 response.answer.append(rrset)
205
206 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
207 self.assertTrue(receivedQuery)
208 self.assertTrue(receivedResponse)
209 receivedQuery.id = query.id
210 self.assertEquals(receivedQuery, query)
211 self.assertEquals(receivedResponse, response)
212
213 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
214 self.assertTrue(receivedQuery)
215 self.assertTrue(receivedResponse)
216 receivedQuery.id = query.id
217 self.assertEquals(receivedQuery, query)
218 self.assertEquals(receivedResponse, response)
219
220 def testMultiCookiesECS6(self):
221 """
222 EDNS Options: Two Cookies + ECS6
223 """
224 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
225 eco1 = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
226 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
227 eco2 = cookiesoption.CookiesOption('deadc0de', 'deadc0de')
228 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
229 response = dns.message.make_response(query)
230 rrset = dns.rrset.from_text(name,
231 3600,
232 dns.rdataclass.IN,
233 dns.rdatatype.A,
234 '127.0.0.1')
235 response.answer.append(rrset)
236
237 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
238 self.assertTrue(receivedQuery)
239 self.assertTrue(receivedResponse)
240 receivedQuery.id = query.id
241 self.assertEquals(receivedQuery, query)
242 self.assertEquals(receivedResponse, response)
243
244 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
245 self.assertTrue(receivedQuery)
246 self.assertTrue(receivedResponse)
247 receivedQuery.id = query.id
248 self.assertEquals(receivedQuery, query)
249 self.assertEquals(receivedResponse, response)
250
251 class TestEDNSOptionsAddingECS(EDNSOptionsBase):
252
253 _config_template = """
254 %s
255
256 addLuaAction(AllRule(), testEDNSOptions)
257
258 newServer{address="127.0.0.1:%s", useClientSubnet=true}
259 """
260 _config_params = ['_ednsTestFunction', '_testServerPort']
261
262 def testWithoutEDNS(self):
263 """
264 EDNS Options: No EDNS (adding ECS)
265 """
266 name = 'noedns.ednsoptions-ecs.tests.powerdns.com.'
267 query = dns.message.make_query(name, 'A', 'IN')
268 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
269 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[ecso], payload=512)
270 response = dns.message.make_response(query)
271 rrset = dns.rrset.from_text(name,
272 3600,
273 dns.rdataclass.IN,
274 dns.rdatatype.A,
275 '127.0.0.1')
276 response.answer.append(rrset)
277
278 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
279 self.assertTrue(receivedQuery)
280 self.assertTrue(receivedResponse)
281 receivedQuery.id = expectedQuery.id
282 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
283 self.checkResponseNoEDNS(response, receivedResponse)
284
285 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
286 self.assertTrue(receivedQuery)
287 self.assertTrue(receivedResponse)
288 receivedQuery.id = expectedQuery.id
289 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
290 self.checkResponseNoEDNS(response, receivedResponse)
291
292 def testCookie(self):
293 """
294 EDNS Options: Cookie (adding ECS)
295 """
296 name = 'cookie.ednsoptions-ecs.tests.powerdns.com.'
297 eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
298 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
299 ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
300 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[eco,ecso], payload=512)
301 response = dns.message.make_response(query)
302 rrset = dns.rrset.from_text(name,
303 3600,
304 dns.rdataclass.IN,
305 dns.rdatatype.A,
306 '127.0.0.1')
307 response.answer.append(rrset)
308
309 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
310 self.assertTrue(receivedQuery)
311 self.assertTrue(receivedResponse)
312 receivedQuery.id = expectedQuery.id
313 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
314 self.checkResponseEDNSWithoutECS(response, receivedResponse)
315
316 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
317 self.assertTrue(receivedQuery)
318 self.assertTrue(receivedResponse)
319 receivedQuery.id = expectedQuery.id
320 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
321 self.checkResponseEDNSWithoutECS(response, receivedResponse)
322
323 def testECS4(self):
324 """
325 EDNS Options: ECS4 (adding ECS)
326 """
327 name = 'ecs4.ednsoptions-ecs.tests.powerdns.com.'
328 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4', 32)
329 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
330 ecsoResponse = clientsubnetoption.ClientSubnetOption('1.2.3.4', 24, scope=24)
331 response = dns.message.make_response(query)
332 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
333 rrset = dns.rrset.from_text(name,
334 3600,
335 dns.rdataclass.IN,
336 dns.rdatatype.A,
337 '127.0.0.1')
338 response.answer.append(rrset)
339
340 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
341 self.assertTrue(receivedQuery)
342 self.assertTrue(receivedResponse)
343 receivedQuery.id = query.id
344 self.checkQueryEDNSWithECS(query, receivedQuery)
345 self.checkResponseEDNSWithECS(response, receivedResponse)
346
347 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
348 self.assertTrue(receivedQuery)
349 self.assertTrue(receivedResponse)
350 receivedQuery.id = query.id
351 self.checkQueryEDNSWithECS(query, receivedQuery)
352 self.checkResponseEDNSWithECS(response, receivedResponse)
353
354 def testECS6(self):
355 """
356 EDNS Options: ECS6 (adding ECS)
357 """
358 name = 'ecs6.ednsoptions-ecs.tests.powerdns.com.'
359 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
360 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso])
361 ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56)
362 response = dns.message.make_response(query)
363 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
364 rrset = dns.rrset.from_text(name,
365 3600,
366 dns.rdataclass.IN,
367 dns.rdatatype.A,
368 '127.0.0.1')
369 response.answer.append(rrset)
370
371 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
372 self.assertTrue(receivedQuery)
373 self.assertTrue(receivedResponse)
374 receivedQuery.id = query.id
375 self.checkQueryEDNSWithECS(query, receivedQuery)
376 self.checkResponseEDNSWithECS(response, receivedResponse)
377
378 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
379 self.assertTrue(receivedQuery)
380 self.assertTrue(receivedResponse)
381 receivedQuery.id = query.id
382 self.checkQueryEDNSWithECS(query, receivedQuery)
383 self.checkResponseEDNSWithECS(response, receivedResponse)
384
385 def testECS6Cookie(self):
386 """
387 EDNS Options: Cookie + ECS6 (adding ECS)
388 """
389 name = 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.'
390 eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
391 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
392 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
393 ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56)
394 response = dns.message.make_response(query)
395 response.use_edns(edns=True, payload=4096, options=[ecsoResponse])
396 rrset = dns.rrset.from_text(name,
397 3600,
398 dns.rdataclass.IN,
399 dns.rdatatype.A,
400 '127.0.0.1')
401 response.answer.append(rrset)
402
403 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
404 self.assertTrue(receivedQuery)
405 self.assertTrue(receivedResponse)
406 receivedQuery.id = query.id
407 self.checkQueryEDNSWithECS(query, receivedQuery, 1)
408 self.checkResponseEDNSWithECS(response, receivedResponse)
409
410 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
411 self.assertTrue(receivedQuery)
412 self.assertTrue(receivedResponse)
413 receivedQuery.id = query.id
414 self.checkQueryEDNSWithECS(query, receivedQuery, 1)
415 self.checkResponseEDNSWithECS(response, receivedResponse)
416
417 def testMultiCookiesECS6(self):
418 """
419 EDNS Options: Two Cookies + ECS6
420 """
421 name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
422 eco1 = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
423 ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
424 eco2 = cookiesoption.CookiesOption('deadc0de', 'deadc0de')
425 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
426 response = dns.message.make_response(query)
427 rrset = dns.rrset.from_text(name,
428 3600,
429 dns.rdataclass.IN,
430 dns.rdatatype.A,
431 '127.0.0.1')
432 response.answer.append(rrset)
433
434 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
435 self.assertTrue(receivedQuery)
436 self.assertTrue(receivedResponse)
437 receivedQuery.id = query.id
438 self.assertEquals(receivedQuery, query)
439 self.assertEquals(receivedResponse, response)
440
441 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
442 self.assertTrue(receivedQuery)
443 self.assertTrue(receivedResponse)
444 receivedQuery.id = query.id
445 self.assertEquals(receivedQuery, query)
446 self.assertEquals(receivedResponse, response)