6 import clientsubnetoption
7 from dnsdisttests
import DNSDistTest
10 from io
import BytesIO
11 #from hyper import HTTP20Connection
12 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
14 @unittest.skipIf('SKIP_DOH_TESTS' in os
.environ
, 'DNS over HTTPS tests are disabled')
15 class DNSDistDOHTest(DNSDistTest
):
18 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
22 wire
= query
.to_wire()
23 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
24 return baseurl
+ "?dns=" + param
27 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
29 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
31 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
32 "Accept: application/dns-message"])
36 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
37 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
38 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
39 response_headers
= BytesIO()
40 #conn.setopt(pycurl.VERBOSE, True)
41 conn
.setopt(pycurl
.URL
, url
)
42 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
44 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
45 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
47 conn
.setopt(pycurl
.CAINFO
, caFile
)
49 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
50 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
53 cls
._toResponderQueue
.put(response
, True, timeout
)
57 cls
._response
_headers
= ''
58 data
= conn
.perform_rb()
59 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
60 if cls
._rcode
== 200 and not rawResponse
:
61 message
= dns
.message
.from_wire(data
)
65 if useQueue
and not cls
._fromResponderQueue
.empty():
66 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
68 cls
._response
_headers
= response_headers
.getvalue()
69 return (receivedQuery
, message
)
72 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
74 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
75 response_headers
= BytesIO()
76 #conn.setopt(pycurl.VERBOSE, True)
77 conn
.setopt(pycurl
.URL
, url
)
78 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
80 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
81 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
83 conn
.setopt(pycurl
.CAINFO
, caFile
)
85 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
86 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
87 conn
.setopt(pycurl
.POST
, True)
92 conn
.setopt(pycurl
.POSTFIELDS
, data
)
95 cls
._toResponderQueue
.put(response
, True, timeout
)
99 cls
._response
_headers
= ''
100 data
= conn
.perform_rb()
101 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
102 if cls
._rcode
== 200 and not rawResponse
:
103 message
= dns
.message
.from_wire(data
)
107 if useQueue
and not cls
._fromResponderQueue
.empty():
108 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
110 cls
._response
_headers
= response_headers
.getvalue()
111 return (receivedQuery
, message
)
116 # for some reason, @unittest.skipIf() is not applied to derived classes with some versions of Python
117 if 'SKIP_DOH_TESTS' in os
.environ
:
118 raise unittest
.SkipTest('DNS over HTTPS tests are disabled')
120 cls
.startResponders()
124 print("Launching tests..")
127 # def openDOHConnection(cls, port, caFile, timeout=2.0):
128 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
129 # sslctx.load_verify_locations(caFile)
130 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
133 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
134 # url = cls.getDOHGetURL(baseurl, query)
137 # cls._toResponderQueue.put(response, True, timeout)
139 # conn.request('GET', url)
142 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
144 # data = conn.get_response()
148 # message = dns.message.from_wire(data)
150 # if useQueue and not cls._fromResponderQueue.empty():
151 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
152 # return (receivedQuery, message)
156 class TestDOH(DNSDistDOHTest
):
158 _serverKey
= 'server.key'
159 _serverCert
= 'server.chain'
160 _serverName
= 'tls.tests.dnsdist.org'
162 _dohServerPort
= 8443
163 _customResponseHeader1
= 'access-control-allow-origin: *'
164 _customResponseHeader2
= 'user-agent: derp'
165 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
166 _config_template
= """
167 newServer{address="127.0.0.1:%s"}
169 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/", "/coffee", "/PowerDNS", "/PowerDNS2", "/PowerDNS-999" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp",["UPPERCASE"]="VaLuE"}})
170 dohFE = getDOHFrontend(0)
171 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['FoO']='bar'})})
173 addAction("drop.doh.tests.powerdns.com.", DropAction())
174 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
175 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
176 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
177 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
178 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
179 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
180 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
182 function dohHandler(dq)
183 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
184 local foundct = false
185 for key,value in pairs(dq:getHTTPHeaders()) do
186 if key == 'content-type' and value == 'application/dns-message' then
192 dq:setHTTPResponse(200, 'It works!', 'text/plain')
194 return DNSAction.HeaderModify
197 return DNSAction.None
199 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
201 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
203 def testDOHSimple(self
):
207 name
= 'simple.doh.tests.powerdns.com.'
208 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
210 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
212 response
= dns
.message
.make_response(query
)
213 rrset
= dns
.rrset
.from_text(name
,
218 response
.answer
.append(rrset
)
220 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
221 self
.assertTrue(receivedQuery
)
222 self
.assertTrue(receivedResponse
)
223 receivedQuery
.id = expectedQuery
.id
224 self
.assertEquals(expectedQuery
, receivedQuery
)
225 self
.assertTrue((self
._customResponseHeader
1) in self
._response
_headers
.decode())
226 self
.assertTrue((self
._customResponseHeader
2) in self
._response
_headers
.decode())
227 self
.assertFalse(('UPPERCASE: VaLuE' in self
._response
_headers
.decode()))
228 self
.assertTrue(('uppercase: VaLuE' in self
._response
_headers
.decode()))
229 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
230 self
.assertEquals(response
, receivedResponse
)
232 def testDOHSimplePOST(self
):
234 DOH: Simple POST query
236 name
= 'simple-post.doh.tests.powerdns.com.'
237 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
239 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
241 response
= dns
.message
.make_response(query
)
242 rrset
= dns
.rrset
.from_text(name
,
247 response
.answer
.append(rrset
)
249 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
250 self
.assertTrue(receivedQuery
)
251 self
.assertTrue(receivedResponse
)
252 receivedQuery
.id = expectedQuery
.id
253 self
.assertEquals(expectedQuery
, receivedQuery
)
254 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
255 self
.assertEquals(response
, receivedResponse
)
257 def testDOHExistingEDNS(self
):
261 name
= 'existing-edns.doh.tests.powerdns.com.'
262 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
264 response
= dns
.message
.make_response(query
)
265 rrset
= dns
.rrset
.from_text(name
,
270 response
.answer
.append(rrset
)
272 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
273 self
.assertTrue(receivedQuery
)
274 self
.assertTrue(receivedResponse
)
275 receivedQuery
.id = query
.id
276 self
.assertEquals(query
, receivedQuery
)
277 self
.assertEquals(response
, receivedResponse
)
278 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
279 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
281 def testDOHExistingECS(self
):
283 DOH: Existing EDNS Client Subnet
285 name
= 'existing-ecs.doh.tests.powerdns.com.'
286 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
287 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
288 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
290 response
= dns
.message
.make_response(query
)
291 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
292 rrset
= dns
.rrset
.from_text(name
,
297 response
.answer
.append(rrset
)
299 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
300 self
.assertTrue(receivedQuery
)
301 self
.assertTrue(receivedResponse
)
302 receivedQuery
.id = query
.id
303 self
.assertEquals(query
, receivedQuery
)
304 self
.assertEquals(response
, receivedResponse
)
305 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
306 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
308 def testDropped(self
):
312 name
= 'drop.doh.tests.powerdns.com.'
313 query
= dns
.message
.make_query(name
, 'A', 'IN')
314 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
315 self
.assertEquals(receivedResponse
, None)
317 def testRefused(self
):
321 name
= 'refused.doh.tests.powerdns.com.'
322 query
= dns
.message
.make_query(name
, 'A', 'IN')
324 query
.flags
&= ~dns
.flags
.RD
325 expectedResponse
= dns
.message
.make_response(query
)
326 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
328 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
329 self
.assertEquals(receivedResponse
, expectedResponse
)
335 name
= 'spoof.doh.tests.powerdns.com.'
336 query
= dns
.message
.make_query(name
, 'A', 'IN')
338 query
.flags
&= ~dns
.flags
.RD
339 expectedResponse
= dns
.message
.make_response(query
)
340 rrset
= dns
.rrset
.from_text(name
,
345 expectedResponse
.answer
.append(rrset
)
347 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
348 self
.assertEquals(receivedResponse
, expectedResponse
)
350 def testDOHInvalid(self
):
354 name
= 'invalid.doh.tests.powerdns.com.'
355 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
357 # first an invalid query
358 invalidQuery
= invalidQuery
.to_wire()
359 invalidQuery
= invalidQuery
[:-5]
360 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
361 self
.assertEquals(receivedResponse
, None)
363 # and now a valid one
364 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
366 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
368 response
= dns
.message
.make_response(query
)
369 rrset
= dns
.rrset
.from_text(name
,
374 response
.answer
.append(rrset
)
375 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
376 self
.assertTrue(receivedQuery
)
377 self
.assertTrue(receivedResponse
)
378 receivedQuery
.id = expectedQuery
.id
379 self
.assertEquals(expectedQuery
, receivedQuery
)
380 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
381 self
.assertEquals(response
, receivedResponse
)
383 def testDOHWithoutQuery(self
):
387 name
= 'empty-get.doh.tests.powerdns.com.'
388 url
= self
._dohBaseURL
389 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
390 conn
.setopt(pycurl
.URL
, url
)
391 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
392 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
393 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
394 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
395 data
= conn
.perform_rb()
396 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
397 self
.assertEquals(rcode
, 400)
399 def testDOHEmptyPOST(self
):
401 DOH: Empty POST query
403 name
= 'empty-post.doh.tests.powerdns.com.'
405 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
406 self
.assertEquals(receivedResponse
, None)
408 # and now a valid one
409 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
411 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
413 response
= dns
.message
.make_response(query
)
414 rrset
= dns
.rrset
.from_text(name
,
419 response
.answer
.append(rrset
)
420 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
421 self
.assertTrue(receivedQuery
)
422 self
.assertTrue(receivedResponse
)
423 receivedQuery
.id = expectedQuery
.id
424 self
.assertEquals(expectedQuery
, receivedQuery
)
425 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
426 self
.assertEquals(response
, receivedResponse
)
428 def testHeaderRule(self
):
432 name
= 'header-rule.doh.tests.powerdns.com.'
433 query
= dns
.message
.make_query(name
, 'A', 'IN')
435 query
.flags
&= ~dns
.flags
.RD
436 expectedResponse
= dns
.message
.make_response(query
)
437 rrset
= dns
.rrset
.from_text(name
,
442 expectedResponse
.answer
.append(rrset
)
444 # this header should match
445 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
446 self
.assertEquals(receivedResponse
, expectedResponse
)
448 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
449 expectedQuery
.flags
&= ~dns
.flags
.RD
451 response
= dns
.message
.make_response(query
)
452 rrset
= dns
.rrset
.from_text(name
,
457 response
.answer
.append(rrset
)
459 # this content of the header should NOT match
460 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
461 self
.assertTrue(receivedQuery
)
462 self
.assertTrue(receivedResponse
)
463 receivedQuery
.id = expectedQuery
.id
464 self
.assertEquals(expectedQuery
, receivedQuery
)
465 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
466 self
.assertEquals(response
, receivedResponse
)
468 def testHTTPPath(self
):
472 name
= 'http-path.doh.tests.powerdns.com.'
473 query
= dns
.message
.make_query(name
, 'A', 'IN')
475 query
.flags
&= ~dns
.flags
.RD
476 expectedResponse
= dns
.message
.make_response(query
)
477 rrset
= dns
.rrset
.from_text(name
,
482 expectedResponse
.answer
.append(rrset
)
484 # this path should match
485 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
486 self
.assertEquals(receivedResponse
, expectedResponse
)
488 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
490 expectedQuery
.flags
&= ~dns
.flags
.RD
491 response
= dns
.message
.make_response(query
)
492 rrset
= dns
.rrset
.from_text(name
,
497 response
.answer
.append(rrset
)
499 # this path should NOT match
500 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
501 self
.assertTrue(receivedQuery
)
502 self
.assertTrue(receivedResponse
)
503 receivedQuery
.id = expectedQuery
.id
504 self
.assertEquals(expectedQuery
, receivedQuery
)
505 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
506 self
.assertEquals(response
, receivedResponse
)
508 # this path is not in the URLs map and should lead to a 404
509 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS/something", query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
510 self
.assertTrue(receivedResponse
)
511 self
.assertEquals(receivedResponse
, b
'there is no endpoint configured for this path')
512 self
.assertEquals(self
._rcode
, 404)
514 def testHTTPPathRegex(self
):
518 name
= 'http-path-regex.doh.tests.powerdns.com.'
519 query
= dns
.message
.make_query(name
, 'A', 'IN')
521 query
.flags
&= ~dns
.flags
.RD
522 expectedResponse
= dns
.message
.make_response(query
)
523 rrset
= dns
.rrset
.from_text(name
,
528 expectedResponse
.answer
.append(rrset
)
530 # this path should match
531 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS-999', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
532 self
.assertEquals(receivedResponse
, expectedResponse
)
534 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
536 expectedQuery
.flags
&= ~dns
.flags
.RD
537 response
= dns
.message
.make_response(query
)
538 rrset
= dns
.rrset
.from_text(name
,
543 response
.answer
.append(rrset
)
545 # this path should NOT match
546 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
547 self
.assertTrue(receivedQuery
)
548 self
.assertTrue(receivedResponse
)
549 receivedQuery
.id = expectedQuery
.id
550 self
.assertEquals(expectedQuery
, receivedQuery
)
551 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
552 self
.assertEquals(response
, receivedResponse
)
554 def testHTTPStatusAction200(self
):
556 DOH: HTTPStatusAction 200 OK
558 name
= 'http-status-action.doh.tests.powerdns.com.'
559 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
562 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
563 self
.assertTrue(receivedResponse
)
564 self
.assertEquals(receivedResponse
, b
'Plaintext answer')
565 self
.assertEquals(self
._rcode
, 200)
566 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
568 def testHTTPStatusAction307(self
):
570 DOH: HTTPStatusAction 307
572 name
= 'http-status-action-redirect.doh.tests.powerdns.com.'
573 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
576 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
577 self
.assertTrue(receivedResponse
)
578 self
.assertEquals(self
._rcode
, 307)
579 self
.assertTrue('location: https://doh.powerdns.org' in self
._response
_headers
.decode())
581 def testHTTPLuaResponse(self
):
583 DOH: Lua HTTP Response
585 name
= 'http-lua.doh.tests.powerdns.com.'
586 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
589 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
590 self
.assertTrue(receivedResponse
)
591 self
.assertEquals(receivedResponse
, b
'It works!')
592 self
.assertEquals(self
._rcode
, 200)
593 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
595 def testHTTPEarlyResponse(self
):
597 DOH: HTTP Early Response
599 response_headers
= BytesIO()
600 url
= self
._dohBaseURL
+ 'coffee'
601 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
602 conn
.setopt(pycurl
.URL
, url
)
603 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
604 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
605 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
606 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
607 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
608 data
= conn
.perform_rb()
609 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
610 headers
= response_headers
.getvalue().decode()
612 self
.assertEquals(rcode
, 418)
613 self
.assertEquals(data
, b
'C0FFEE')
614 self
.assertIn('foo: bar', headers
)
615 self
.assertNotIn(self
._customResponseHeader
2, headers
)
617 response_headers
= BytesIO()
618 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
619 conn
.setopt(pycurl
.URL
, url
)
620 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
621 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
622 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
623 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
624 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
625 conn
.setopt(pycurl
.POST
, True)
627 conn
.setopt(pycurl
.POSTFIELDS
, data
)
629 data
= conn
.perform_rb()
630 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
631 headers
= response_headers
.getvalue().decode()
632 self
.assertEquals(rcode
, 418)
633 self
.assertEquals(data
, b
'C0FFEE')
634 self
.assertIn('foo: bar', headers
)
635 self
.assertNotIn(self
._customResponseHeader
2, headers
)
637 class TestDOHAddingECS(DNSDistDOHTest
):
639 _serverKey
= 'server.key'
640 _serverCert
= 'server.chain'
641 _serverName
= 'tls.tests.dnsdist.org'
643 _dohServerPort
= 8443
644 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
645 _config_template
= """
646 newServer{address="127.0.0.1:%s", useClientSubnet=true}
647 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
650 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
652 def testDOHSimple(self
):
654 DOH with ECS: Simple query
656 name
= 'simple.doh-ecs.tests.powerdns.com.'
657 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
659 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
660 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
661 response
= dns
.message
.make_response(query
)
662 rrset
= dns
.rrset
.from_text(name
,
667 response
.answer
.append(rrset
)
669 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
670 self
.assertTrue(receivedQuery
)
671 self
.assertTrue(receivedResponse
)
672 expectedQuery
.id = receivedQuery
.id
673 self
.assertEquals(expectedQuery
, receivedQuery
)
674 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
675 self
.assertEquals(response
, receivedResponse
)
676 self
.checkResponseNoEDNS(response
, receivedResponse
)
678 def testDOHExistingEDNS(self
):
680 DOH with ECS: Existing EDNS
682 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
683 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
685 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
686 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
687 response
= dns
.message
.make_response(query
)
688 rrset
= dns
.rrset
.from_text(name
,
693 response
.answer
.append(rrset
)
695 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
696 self
.assertTrue(receivedQuery
)
697 self
.assertTrue(receivedResponse
)
698 receivedQuery
.id = expectedQuery
.id
699 self
.assertEquals(expectedQuery
, receivedQuery
)
700 self
.assertEquals(response
, receivedResponse
)
701 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
702 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
704 def testDOHExistingECS(self
):
706 DOH with ECS: Existing EDNS Client Subnet
708 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
709 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
710 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
711 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
713 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
714 response
= dns
.message
.make_response(query
)
715 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
716 rrset
= dns
.rrset
.from_text(name
,
721 response
.answer
.append(rrset
)
723 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
724 self
.assertTrue(receivedQuery
)
725 self
.assertTrue(receivedResponse
)
726 receivedQuery
.id = expectedQuery
.id
727 self
.assertEquals(expectedQuery
, receivedQuery
)
728 self
.assertEquals(response
, receivedResponse
)
729 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
730 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
732 class TestDOHOverHTTP(DNSDistDOHTest
):
734 _dohServerPort
= 8480
735 _serverName
= 'tls.tests.dnsdist.org'
736 _dohBaseURL
= ("http://%s:%d/" % (_serverName
, _dohServerPort
))
737 _config_template
= """
738 newServer{address="127.0.0.1:%s"}
739 addDOHLocal("127.0.0.1:%s")
741 _config_params
= ['_testServerPort', '_dohServerPort']
742 _checkConfigExpectedOutput
= b
"""No certificate provided for DoH endpoint 127.0.0.1:8480, running in DNS over HTTP mode instead of DNS over HTTPS
743 Configuration 'configs/dnsdist_TestDOHOverHTTP.conf' OK!
746 def testDOHSimple(self
):
748 DOH over HTTP: Simple query
750 name
= 'simple.doh-over-http.tests.powerdns.com.'
751 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
753 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
754 response
= dns
.message
.make_response(query
)
755 rrset
= dns
.rrset
.from_text(name
,
760 response
.answer
.append(rrset
)
762 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
763 self
.assertTrue(receivedQuery
)
764 self
.assertTrue(receivedResponse
)
765 expectedQuery
.id = receivedQuery
.id
766 self
.assertEquals(expectedQuery
, receivedQuery
)
767 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
768 self
.assertEquals(response
, receivedResponse
)
769 self
.checkResponseNoEDNS(response
, receivedResponse
)
771 def testDOHSimplePOST(self
):
773 DOH over HTTP: Simple POST query
775 name
= 'simple-post.doh-over-http.tests.powerdns.com.'
776 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
778 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
780 response
= dns
.message
.make_response(query
)
781 rrset
= dns
.rrset
.from_text(name
,
786 response
.answer
.append(rrset
)
788 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
789 self
.assertTrue(receivedQuery
)
790 self
.assertTrue(receivedResponse
)
791 receivedQuery
.id = expectedQuery
.id
792 self
.assertEquals(expectedQuery
, receivedQuery
)
793 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
794 self
.assertEquals(response
, receivedResponse
)
795 self
.checkResponseNoEDNS(response
, receivedResponse
)
797 class TestDOHWithCache(DNSDistDOHTest
):
799 _serverKey
= 'server.key'
800 _serverCert
= 'server.chain'
801 _serverName
= 'tls.tests.dnsdist.org'
803 _dohServerPort
= 8443
804 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
805 _config_template
= """
806 newServer{address="127.0.0.1:%s"}
808 addDOHLocal("127.0.0.1:%s", "%s", "%s")
810 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
811 getPool(""):setCache(pc)
813 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
815 def testDOHCacheLargeAnswer(self
):
817 DOH with cache: Check that we can cache (and retrieve) large answers
820 name
= 'large.doh-with-cache.tests.powerdns.com.'
821 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
823 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
825 response
= dns
.message
.make_response(query
)
826 # we prepare a large answer
830 content
= content
+ ', '
831 content
= content
+ (str(i
)*50)
833 content
= content
+ 'A'*40
835 rrset
= dns
.rrset
.from_text(name
,
840 response
.answer
.append(rrset
)
841 self
.assertEquals(len(response
.to_wire()), 4096)
843 # first query to fill the cache
844 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
845 self
.assertTrue(receivedQuery
)
846 self
.assertTrue(receivedResponse
)
847 receivedQuery
.id = expectedQuery
.id
848 self
.assertEquals(expectedQuery
, receivedQuery
)
849 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
850 self
.assertEquals(response
, receivedResponse
)
852 for _
in range(numberOfQueries
):
853 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False)
854 self
.assertEquals(receivedResponse
, response
)