]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
12 class DNSDistDOHTest(DNSDistTest
):
15 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
19 wire
= query
.to_wire()
20 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
21 return baseurl
+ "?dns=" + param
24 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
26 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
28 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
33 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, customHeaders
=[]):
34 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
35 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
36 #conn.setopt(pycurl.VERBOSE, True)
37 conn
.setopt(pycurl
.URL
, url
)
38 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
39 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
40 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
41 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
43 conn
.setopt(pycurl
.CAINFO
, caFile
)
46 cls
._toResponderQueue
.put(response
, True, timeout
)
50 data
= conn
.perform_rb()
51 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
53 message
= dns
.message
.from_wire(data
)
55 if useQueue
and not cls
._fromResponderQueue
.empty():
56 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
58 return (receivedQuery
, message
)
61 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False):
63 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
64 #conn.setopt(pycurl.VERBOSE, True)
65 conn
.setopt(pycurl
.URL
, url
)
66 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
67 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
68 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
69 conn
.setopt(pycurl
.POST
, True)
74 conn
.setopt(pycurl
.POSTFIELDS
, data
)
77 conn
.setopt(pycurl
.CAINFO
, caFile
)
80 cls
._toResponderQueue
.put(response
, True, timeout
)
84 data
= conn
.perform_rb()
85 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
87 message
= dns
.message
.from_wire(data
)
89 if useQueue
and not cls
._fromResponderQueue
.empty():
90 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
92 return (receivedQuery
, message
)
95 # def openDOHConnection(cls, port, caFile, timeout=2.0):
96 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
97 # sslctx.load_verify_locations(caFile)
98 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
101 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
102 # url = cls.getDOHGetURL(baseurl, query)
105 # cls._toResponderQueue.put(response, True, timeout)
107 # conn.request('GET', url)
110 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
112 # data = conn.get_response()
116 # message = dns.message.from_wire(data)
118 # if useQueue and not cls._fromResponderQueue.empty():
119 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
120 # return (receivedQuery, message)
124 class TestDOH(DNSDistDOHTest
):
126 _serverKey
= 'server.key'
127 _serverCert
= 'server.chain'
128 _serverName
= 'tls.tests.dnsdist.org'
130 _dohServerPort
= 8443
131 _serverName
= 'tls.tests.dnsdist.org'
132 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
133 _config_template
= """
134 newServer{address="127.0.0.1:%s"}
135 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
137 addAction("drop.doh.tests.powerdns.com.", DropAction())
138 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
139 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
140 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
141 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
143 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
145 def testDOHSimple(self
):
149 name
= 'simple.doh.tests.powerdns.com.'
150 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
152 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
154 response
= dns
.message
.make_response(query
)
155 rrset
= dns
.rrset
.from_text(name
,
160 response
.answer
.append(rrset
)
162 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
163 self
.assertTrue(receivedQuery
)
164 self
.assertTrue(receivedResponse
)
165 receivedQuery
.id = expectedQuery
.id
166 self
.assertEquals(expectedQuery
, receivedQuery
)
167 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
168 self
.assertEquals(response
, receivedResponse
)
170 def testDOHSimplePOST(self
):
172 DOH: Simple POST query
174 name
= 'simple-post.doh.tests.powerdns.com.'
175 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
177 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
179 response
= dns
.message
.make_response(query
)
180 rrset
= dns
.rrset
.from_text(name
,
185 response
.answer
.append(rrset
)
187 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
188 self
.assertTrue(receivedQuery
)
189 self
.assertTrue(receivedResponse
)
190 receivedQuery
.id = expectedQuery
.id
191 self
.assertEquals(expectedQuery
, receivedQuery
)
192 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
193 self
.assertEquals(response
, receivedResponse
)
195 def testDOHExistingEDNS(self
):
199 name
= 'existing-edns.doh.tests.powerdns.com.'
200 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
202 response
= dns
.message
.make_response(query
)
203 rrset
= dns
.rrset
.from_text(name
,
208 response
.answer
.append(rrset
)
210 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
211 self
.assertTrue(receivedQuery
)
212 self
.assertTrue(receivedResponse
)
213 receivedQuery
.id = query
.id
214 self
.assertEquals(query
, receivedQuery
)
215 self
.assertEquals(response
, receivedResponse
)
216 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
217 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
219 def testDOHExistingECS(self
):
221 DOH: Existing EDNS Client Subnet
223 name
= 'existing-ecs.doh.tests.powerdns.com.'
224 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
225 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
226 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
228 response
= dns
.message
.make_response(query
)
229 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
230 rrset
= dns
.rrset
.from_text(name
,
235 response
.answer
.append(rrset
)
237 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
238 self
.assertTrue(receivedQuery
)
239 self
.assertTrue(receivedResponse
)
240 receivedQuery
.id = query
.id
241 self
.assertEquals(query
, receivedQuery
)
242 self
.assertEquals(response
, receivedResponse
)
243 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
244 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
246 def testDropped(self
):
250 name
= 'drop.doh.tests.powerdns.com.'
251 query
= dns
.message
.make_query(name
, 'A', 'IN')
252 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
253 self
.assertEquals(receivedResponse
, None)
255 def testRefused(self
):
259 name
= 'refused.doh.tests.powerdns.com.'
260 query
= dns
.message
.make_query(name
, 'A', 'IN')
262 expectedResponse
= dns
.message
.make_response(query
)
263 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
265 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
266 self
.assertEquals(receivedResponse
, expectedResponse
)
272 name
= 'spoof.doh.tests.powerdns.com.'
273 query
= dns
.message
.make_query(name
, 'A', 'IN')
275 query
.flags
&= ~dns
.flags
.RD
276 expectedResponse
= dns
.message
.make_response(query
)
277 rrset
= dns
.rrset
.from_text(name
,
282 expectedResponse
.answer
.append(rrset
)
284 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
285 self
.assertEquals(receivedResponse
, expectedResponse
)
287 def testDOHInvalid(self
):
291 name
= 'invalid.doh.tests.powerdns.com.'
292 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
294 # first an invalid query
295 invalidQuery
= invalidQuery
.to_wire()
296 invalidQuery
= invalidQuery
[:-5]
297 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
298 self
.assertEquals(receivedResponse
, None)
300 # and now a valid one
301 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
303 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
305 response
= dns
.message
.make_response(query
)
306 rrset
= dns
.rrset
.from_text(name
,
311 response
.answer
.append(rrset
)
312 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
313 self
.assertTrue(receivedQuery
)
314 self
.assertTrue(receivedResponse
)
315 receivedQuery
.id = expectedQuery
.id
316 self
.assertEquals(expectedQuery
, receivedQuery
)
317 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
318 self
.assertEquals(response
, receivedResponse
)
320 def testDOHWithoutQuery(self
):
324 name
= 'empty-get.doh.tests.powerdns.com.'
325 url
= self
._dohBaseURL
326 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
327 conn
.setopt(pycurl
.URL
, url
)
328 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
329 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
330 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
331 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
332 data
= conn
.perform_rb()
333 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
334 self
.assertEquals(rcode
, 400)
336 def testDOHEmptyPOST(self
):
338 DOH: Empty POST query
340 name
= 'empty-post.doh.tests.powerdns.com.'
342 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
343 self
.assertEquals(receivedResponse
, None)
345 # and now a valid one
346 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
348 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
350 response
= dns
.message
.make_response(query
)
351 rrset
= dns
.rrset
.from_text(name
,
356 response
.answer
.append(rrset
)
357 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
358 self
.assertTrue(receivedQuery
)
359 self
.assertTrue(receivedResponse
)
360 receivedQuery
.id = expectedQuery
.id
361 self
.assertEquals(expectedQuery
, receivedQuery
)
362 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
363 self
.assertEquals(response
, receivedResponse
)
365 def testHeaderRule(self
):
369 name
= 'header-rule.doh.tests.powerdns.com.'
370 query
= dns
.message
.make_query(name
, 'A', 'IN')
372 query
.flags
&= ~dns
.flags
.RD
373 expectedResponse
= dns
.message
.make_response(query
)
374 rrset
= dns
.rrset
.from_text(name
,
379 expectedResponse
.answer
.append(rrset
)
381 # this header should match
382 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
383 self
.assertEquals(receivedResponse
, expectedResponse
)
385 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
386 expectedQuery
.flags
&= ~dns
.flags
.RD
388 response
= dns
.message
.make_response(query
)
389 rrset
= dns
.rrset
.from_text(name
,
394 response
.answer
.append(rrset
)
396 # this content of the header should NOT match
397 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
398 self
.assertTrue(receivedQuery
)
399 self
.assertTrue(receivedResponse
)
400 receivedQuery
.id = expectedQuery
.id
401 self
.assertEquals(expectedQuery
, receivedQuery
)
402 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
403 self
.assertEquals(response
, receivedResponse
)
405 def testHTTPPath(self
):
409 name
= 'http-path.doh.tests.powerdns.com.'
410 query
= dns
.message
.make_query(name
, 'A', 'IN')
412 query
.flags
&= ~dns
.flags
.RD
413 expectedResponse
= dns
.message
.make_response(query
)
414 rrset
= dns
.rrset
.from_text(name
,
419 expectedResponse
.answer
.append(rrset
)
421 # this path should match
422 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
423 self
.assertEquals(receivedResponse
, expectedResponse
)
425 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
427 expectedQuery
.flags
&= ~dns
.flags
.RD
428 response
= dns
.message
.make_response(query
)
429 rrset
= dns
.rrset
.from_text(name
,
434 response
.answer
.append(rrset
)
436 # this path should NOT match
437 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
438 self
.assertTrue(receivedQuery
)
439 self
.assertTrue(receivedResponse
)
440 receivedQuery
.id = expectedQuery
.id
441 self
.assertEquals(expectedQuery
, receivedQuery
)
442 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
443 self
.assertEquals(response
, receivedResponse
)
445 class TestDOHAddingECS(DNSDistDOHTest
):
447 _serverKey
= 'server.key'
448 _serverCert
= 'server.chain'
449 _serverName
= 'tls.tests.dnsdist.org'
451 _dohServerPort
= 8443
452 _serverName
= 'tls.tests.dnsdist.org'
453 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
454 _config_template
= """
455 newServer{address="127.0.0.1:%s", useClientSubnet=true}
456 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
459 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
461 def testDOHSimple(self
):
463 DOH with ECS: Simple query
465 name
= 'simple.doh-ecs.tests.powerdns.com.'
466 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
468 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
469 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
470 response
= dns
.message
.make_response(query
)
471 rrset
= dns
.rrset
.from_text(name
,
476 response
.answer
.append(rrset
)
478 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
479 self
.assertTrue(receivedQuery
)
480 self
.assertTrue(receivedResponse
)
481 expectedQuery
.id = receivedQuery
.id
482 self
.assertEquals(expectedQuery
, receivedQuery
)
483 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
484 self
.assertEquals(response
, receivedResponse
)
485 self
.checkResponseNoEDNS(response
, receivedResponse
)
487 def testDOHExistingEDNS(self
):
489 DOH with ECS: Existing EDNS
491 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
492 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
494 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
495 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
496 response
= dns
.message
.make_response(query
)
497 rrset
= dns
.rrset
.from_text(name
,
502 response
.answer
.append(rrset
)
504 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
505 self
.assertTrue(receivedQuery
)
506 self
.assertTrue(receivedResponse
)
507 receivedQuery
.id = expectedQuery
.id
508 self
.assertEquals(expectedQuery
, receivedQuery
)
509 self
.assertEquals(response
, receivedResponse
)
510 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
511 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
513 def testDOHExistingECS(self
):
515 DOH with ECS: Existing EDNS Client Subnet
517 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
518 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
519 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
520 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
522 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
523 response
= dns
.message
.make_response(query
)
524 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
525 rrset
= dns
.rrset
.from_text(name
,
530 response
.answer
.append(rrset
)
532 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
533 self
.assertTrue(receivedQuery
)
534 self
.assertTrue(receivedResponse
)
535 receivedQuery
.id = expectedQuery
.id
536 self
.assertEquals(expectedQuery
, receivedQuery
)
537 self
.assertEquals(response
, receivedResponse
)
538 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
539 self
.checkResponseEDNSWithECS(response
, receivedResponse
)