]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
6 import clientsubnetoption
7 from dnsdisttests
import DNSDistTest
10 from io
import BytesIO
11 #from hyper import HTTP20Connection
12 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
14 @unittest.skipIf('SKIP_DOH_TESTS' in os
.environ
, 'DNS over HTTPS tests are disabled')
15 class DNSDistDOHTest(DNSDistTest
):
18 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
22 wire
= query
.to_wire()
23 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
24 return baseurl
+ "?dns=" + param
27 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
29 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
31 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
32 "Accept: application/dns-message"])
36 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
37 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
38 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
39 response_headers
= BytesIO()
40 #conn.setopt(pycurl.VERBOSE, True)
41 conn
.setopt(pycurl
.URL
, url
)
42 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
44 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
45 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
47 conn
.setopt(pycurl
.CAINFO
, caFile
)
49 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
50 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
53 cls
._toResponderQueue
.put(response
, True, timeout
)
57 cls
._response
_headers
= ''
58 data
= conn
.perform_rb()
59 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
60 if cls
._rcode
== 200 and not rawResponse
:
61 message
= dns
.message
.from_wire(data
)
65 if useQueue
and not cls
._fromResponderQueue
.empty():
66 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
68 cls
._response
_headers
= response_headers
.getvalue()
69 return (receivedQuery
, message
)
72 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
74 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
75 response_headers
= BytesIO()
76 #conn.setopt(pycurl.VERBOSE, True)
77 conn
.setopt(pycurl
.URL
, url
)
78 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
80 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
81 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
83 conn
.setopt(pycurl
.CAINFO
, caFile
)
85 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
86 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
87 conn
.setopt(pycurl
.POST
, True)
92 conn
.setopt(pycurl
.POSTFIELDS
, data
)
95 cls
._toResponderQueue
.put(response
, True, timeout
)
99 cls
._response
_headers
= ''
100 data
= conn
.perform_rb()
101 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
102 if cls
._rcode
== 200 and not rawResponse
:
103 message
= dns
.message
.from_wire(data
)
107 if useQueue
and not cls
._fromResponderQueue
.empty():
108 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
110 cls
._response
_headers
= response_headers
.getvalue()
111 return (receivedQuery
, message
)
116 # for some reason, @unittest.skipIf() is not applied to derived classes with some versions of Python
117 if 'SKIP_DOH_TESTS' in os
.environ
:
118 raise unittest
.SkipTest('DNS over HTTPS tests are disabled')
120 cls
.startResponders()
124 print("Launching tests..")
127 # def openDOHConnection(cls, port, caFile, timeout=2.0):
128 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
129 # sslctx.load_verify_locations(caFile)
130 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
133 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
134 # url = cls.getDOHGetURL(baseurl, query)
137 # cls._toResponderQueue.put(response, True, timeout)
139 # conn.request('GET', url)
142 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
144 # data = conn.get_response()
148 # message = dns.message.from_wire(data)
150 # if useQueue and not cls._fromResponderQueue.empty():
151 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
152 # return (receivedQuery, message)
156 class TestDOH(DNSDistDOHTest
):
158 _serverKey
= 'server.key'
159 _serverCert
= 'server.chain'
160 _serverName
= 'tls.tests.dnsdist.org'
162 _dohServerPort
= 8443
163 _customResponseHeader1
= 'access-control-allow-origin: *'
164 _customResponseHeader2
= 'user-agent: derp'
165 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
166 _config_template
= """
167 newServer{address="127.0.0.1:%s"}
169 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp",["UPPERCASE"]="VaLuE"}})
170 dohFE = getDOHFrontend(0)
171 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['FoO']='bar'})})
173 addAction("drop.doh.tests.powerdns.com.", DropAction())
174 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
175 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
176 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
177 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
178 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
179 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
180 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
182 function dohHandler(dq)
183 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
184 local foundct = false
185 for key,value in pairs(dq:getHTTPHeaders()) do
186 if key == 'content-type' and value == 'application/dns-message' then
192 dq:setHTTPResponse(200, 'It works!', 'text/plain')
194 return DNSAction.HeaderModify
197 return DNSAction.None
199 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
201 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
203 def testDOHSimple(self
):
207 name
= 'simple.doh.tests.powerdns.com.'
208 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
210 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
212 response
= dns
.message
.make_response(query
)
213 rrset
= dns
.rrset
.from_text(name
,
218 response
.answer
.append(rrset
)
220 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
221 self
.assertTrue(receivedQuery
)
222 self
.assertTrue(receivedResponse
)
223 receivedQuery
.id = expectedQuery
.id
224 self
.assertEquals(expectedQuery
, receivedQuery
)
225 self
.assertTrue((self
._customResponseHeader
1) in self
._response
_headers
.decode())
226 self
.assertTrue((self
._customResponseHeader
2) in self
._response
_headers
.decode())
227 self
.assertFalse(('UPPERCASE: VaLuE' in self
._response
_headers
.decode()))
228 self
.assertTrue(('uppercase: VaLuE' in self
._response
_headers
.decode()))
229 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
230 self
.assertEquals(response
, receivedResponse
)
232 def testDOHSimplePOST(self
):
234 DOH: Simple POST query
236 name
= 'simple-post.doh.tests.powerdns.com.'
237 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
239 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
241 response
= dns
.message
.make_response(query
)
242 rrset
= dns
.rrset
.from_text(name
,
247 response
.answer
.append(rrset
)
249 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
250 self
.assertTrue(receivedQuery
)
251 self
.assertTrue(receivedResponse
)
252 receivedQuery
.id = expectedQuery
.id
253 self
.assertEquals(expectedQuery
, receivedQuery
)
254 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
255 self
.assertEquals(response
, receivedResponse
)
257 def testDOHExistingEDNS(self
):
261 name
= 'existing-edns.doh.tests.powerdns.com.'
262 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
264 response
= dns
.message
.make_response(query
)
265 rrset
= dns
.rrset
.from_text(name
,
270 response
.answer
.append(rrset
)
272 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
273 self
.assertTrue(receivedQuery
)
274 self
.assertTrue(receivedResponse
)
275 receivedQuery
.id = query
.id
276 self
.assertEquals(query
, receivedQuery
)
277 self
.assertEquals(response
, receivedResponse
)
278 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
279 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
281 def testDOHExistingECS(self
):
283 DOH: Existing EDNS Client Subnet
285 name
= 'existing-ecs.doh.tests.powerdns.com.'
286 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
287 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
288 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
290 response
= dns
.message
.make_response(query
)
291 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
292 rrset
= dns
.rrset
.from_text(name
,
297 response
.answer
.append(rrset
)
299 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
300 self
.assertTrue(receivedQuery
)
301 self
.assertTrue(receivedResponse
)
302 receivedQuery
.id = query
.id
303 self
.assertEquals(query
, receivedQuery
)
304 self
.assertEquals(response
, receivedResponse
)
305 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
306 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
308 def testDropped(self
):
312 name
= 'drop.doh.tests.powerdns.com.'
313 query
= dns
.message
.make_query(name
, 'A', 'IN')
314 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
315 self
.assertEquals(receivedResponse
, None)
317 def testRefused(self
):
321 name
= 'refused.doh.tests.powerdns.com.'
322 query
= dns
.message
.make_query(name
, 'A', 'IN')
324 expectedResponse
= dns
.message
.make_response(query
)
325 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
327 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
328 self
.assertEquals(receivedResponse
, expectedResponse
)
334 name
= 'spoof.doh.tests.powerdns.com.'
335 query
= dns
.message
.make_query(name
, 'A', 'IN')
337 query
.flags
&= ~dns
.flags
.RD
338 expectedResponse
= dns
.message
.make_response(query
)
339 rrset
= dns
.rrset
.from_text(name
,
344 expectedResponse
.answer
.append(rrset
)
346 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
347 self
.assertEquals(receivedResponse
, expectedResponse
)
349 def testDOHInvalid(self
):
353 name
= 'invalid.doh.tests.powerdns.com.'
354 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
356 # first an invalid query
357 invalidQuery
= invalidQuery
.to_wire()
358 invalidQuery
= invalidQuery
[:-5]
359 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
360 self
.assertEquals(receivedResponse
, None)
362 # and now a valid one
363 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
365 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
367 response
= dns
.message
.make_response(query
)
368 rrset
= dns
.rrset
.from_text(name
,
373 response
.answer
.append(rrset
)
374 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
375 self
.assertTrue(receivedQuery
)
376 self
.assertTrue(receivedResponse
)
377 receivedQuery
.id = expectedQuery
.id
378 self
.assertEquals(expectedQuery
, receivedQuery
)
379 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
380 self
.assertEquals(response
, receivedResponse
)
382 def testDOHWithoutQuery(self
):
386 name
= 'empty-get.doh.tests.powerdns.com.'
387 url
= self
._dohBaseURL
388 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
389 conn
.setopt(pycurl
.URL
, url
)
390 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
391 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
392 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
393 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
394 data
= conn
.perform_rb()
395 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
396 self
.assertEquals(rcode
, 400)
398 def testDOHEmptyPOST(self
):
400 DOH: Empty POST query
402 name
= 'empty-post.doh.tests.powerdns.com.'
404 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
405 self
.assertEquals(receivedResponse
, None)
407 # and now a valid one
408 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
410 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
412 response
= dns
.message
.make_response(query
)
413 rrset
= dns
.rrset
.from_text(name
,
418 response
.answer
.append(rrset
)
419 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
420 self
.assertTrue(receivedQuery
)
421 self
.assertTrue(receivedResponse
)
422 receivedQuery
.id = expectedQuery
.id
423 self
.assertEquals(expectedQuery
, receivedQuery
)
424 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
425 self
.assertEquals(response
, receivedResponse
)
427 def testHeaderRule(self
):
431 name
= 'header-rule.doh.tests.powerdns.com.'
432 query
= dns
.message
.make_query(name
, 'A', 'IN')
434 query
.flags
&= ~dns
.flags
.RD
435 expectedResponse
= dns
.message
.make_response(query
)
436 rrset
= dns
.rrset
.from_text(name
,
441 expectedResponse
.answer
.append(rrset
)
443 # this header should match
444 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
445 self
.assertEquals(receivedResponse
, expectedResponse
)
447 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
448 expectedQuery
.flags
&= ~dns
.flags
.RD
450 response
= dns
.message
.make_response(query
)
451 rrset
= dns
.rrset
.from_text(name
,
456 response
.answer
.append(rrset
)
458 # this content of the header should NOT match
459 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
460 self
.assertTrue(receivedQuery
)
461 self
.assertTrue(receivedResponse
)
462 receivedQuery
.id = expectedQuery
.id
463 self
.assertEquals(expectedQuery
, receivedQuery
)
464 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
465 self
.assertEquals(response
, receivedResponse
)
467 def testHTTPPath(self
):
471 name
= 'http-path.doh.tests.powerdns.com.'
472 query
= dns
.message
.make_query(name
, 'A', 'IN')
474 query
.flags
&= ~dns
.flags
.RD
475 expectedResponse
= dns
.message
.make_response(query
)
476 rrset
= dns
.rrset
.from_text(name
,
481 expectedResponse
.answer
.append(rrset
)
483 # this path should match
484 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
485 self
.assertEquals(receivedResponse
, expectedResponse
)
487 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
489 expectedQuery
.flags
&= ~dns
.flags
.RD
490 response
= dns
.message
.make_response(query
)
491 rrset
= dns
.rrset
.from_text(name
,
496 response
.answer
.append(rrset
)
498 # this path should NOT match
499 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
500 self
.assertTrue(receivedQuery
)
501 self
.assertTrue(receivedResponse
)
502 receivedQuery
.id = expectedQuery
.id
503 self
.assertEquals(expectedQuery
, receivedQuery
)
504 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
505 self
.assertEquals(response
, receivedResponse
)
507 def testHTTPPathRegex(self
):
511 name
= 'http-path-regex.doh.tests.powerdns.com.'
512 query
= dns
.message
.make_query(name
, 'A', 'IN')
514 query
.flags
&= ~dns
.flags
.RD
515 expectedResponse
= dns
.message
.make_response(query
)
516 rrset
= dns
.rrset
.from_text(name
,
521 expectedResponse
.answer
.append(rrset
)
523 # this path should match
524 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS-999', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
525 self
.assertEquals(receivedResponse
, expectedResponse
)
527 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
529 expectedQuery
.flags
&= ~dns
.flags
.RD
530 response
= dns
.message
.make_response(query
)
531 rrset
= dns
.rrset
.from_text(name
,
536 response
.answer
.append(rrset
)
538 # this path should NOT match
539 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
540 self
.assertTrue(receivedQuery
)
541 self
.assertTrue(receivedResponse
)
542 receivedQuery
.id = expectedQuery
.id
543 self
.assertEquals(expectedQuery
, receivedQuery
)
544 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
545 self
.assertEquals(response
, receivedResponse
)
547 def testHTTPStatusAction200(self
):
549 DOH: HTTPStatusAction 200 OK
551 name
= 'http-status-action.doh.tests.powerdns.com.'
552 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
555 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
556 self
.assertTrue(receivedResponse
)
557 self
.assertEquals(receivedResponse
, b
'Plaintext answer')
558 self
.assertEquals(self
._rcode
, 200)
559 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
561 def testHTTPStatusAction307(self
):
563 DOH: HTTPStatusAction 307
565 name
= 'http-status-action-redirect.doh.tests.powerdns.com.'
566 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
569 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
570 self
.assertTrue(receivedResponse
)
571 self
.assertEquals(self
._rcode
, 307)
572 self
.assertTrue('location: https://doh.powerdns.org' in self
._response
_headers
.decode())
574 def testHTTPLuaResponse(self
):
576 DOH: Lua HTTP Response
578 name
= 'http-lua.doh.tests.powerdns.com.'
579 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
582 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
583 self
.assertTrue(receivedResponse
)
584 self
.assertEquals(receivedResponse
, b
'It works!')
585 self
.assertEquals(self
._rcode
, 200)
586 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
588 def testHTTPEarlyResponse(self
):
590 DOH: HTTP Early Response
592 response_headers
= BytesIO()
593 url
= self
._dohBaseURL
+ 'coffee'
594 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
595 conn
.setopt(pycurl
.URL
, url
)
596 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
597 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
598 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
599 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
600 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
601 data
= conn
.perform_rb()
602 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
603 headers
= response_headers
.getvalue().decode()
605 self
.assertEquals(rcode
, 418)
606 self
.assertEquals(data
, b
'C0FFEE')
607 self
.assertIn('foo: bar', headers
)
608 self
.assertNotIn(self
._customResponseHeader
2, headers
)
610 response_headers
= BytesIO()
611 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
612 conn
.setopt(pycurl
.URL
, url
)
613 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
614 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
615 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
616 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
617 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
618 conn
.setopt(pycurl
.POST
, True)
620 conn
.setopt(pycurl
.POSTFIELDS
, data
)
622 data
= conn
.perform_rb()
623 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
624 headers
= response_headers
.getvalue().decode()
625 self
.assertEquals(rcode
, 418)
626 self
.assertEquals(data
, b
'C0FFEE')
627 self
.assertIn('foo: bar', headers
)
628 self
.assertNotIn(self
._customResponseHeader
2, headers
)
630 class TestDOHAddingECS(DNSDistDOHTest
):
632 _serverKey
= 'server.key'
633 _serverCert
= 'server.chain'
634 _serverName
= 'tls.tests.dnsdist.org'
636 _dohServerPort
= 8443
637 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
638 _config_template
= """
639 newServer{address="127.0.0.1:%s", useClientSubnet=true}
640 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
643 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
645 def testDOHSimple(self
):
647 DOH with ECS: Simple query
649 name
= 'simple.doh-ecs.tests.powerdns.com.'
650 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
652 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
653 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
654 response
= dns
.message
.make_response(query
)
655 rrset
= dns
.rrset
.from_text(name
,
660 response
.answer
.append(rrset
)
662 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
663 self
.assertTrue(receivedQuery
)
664 self
.assertTrue(receivedResponse
)
665 expectedQuery
.id = receivedQuery
.id
666 self
.assertEquals(expectedQuery
, receivedQuery
)
667 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
668 self
.assertEquals(response
, receivedResponse
)
669 self
.checkResponseNoEDNS(response
, receivedResponse
)
671 def testDOHExistingEDNS(self
):
673 DOH with ECS: Existing EDNS
675 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
676 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
678 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
679 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
680 response
= dns
.message
.make_response(query
)
681 rrset
= dns
.rrset
.from_text(name
,
686 response
.answer
.append(rrset
)
688 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
689 self
.assertTrue(receivedQuery
)
690 self
.assertTrue(receivedResponse
)
691 receivedQuery
.id = expectedQuery
.id
692 self
.assertEquals(expectedQuery
, receivedQuery
)
693 self
.assertEquals(response
, receivedResponse
)
694 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
695 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
697 def testDOHExistingECS(self
):
699 DOH with ECS: Existing EDNS Client Subnet
701 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
702 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
703 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
704 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
706 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
707 response
= dns
.message
.make_response(query
)
708 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
709 rrset
= dns
.rrset
.from_text(name
,
714 response
.answer
.append(rrset
)
716 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
717 self
.assertTrue(receivedQuery
)
718 self
.assertTrue(receivedResponse
)
719 receivedQuery
.id = expectedQuery
.id
720 self
.assertEquals(expectedQuery
, receivedQuery
)
721 self
.assertEquals(response
, receivedResponse
)
722 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
723 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
725 class TestDOHOverHTTP(DNSDistDOHTest
):
727 _dohServerPort
= 8480
728 _serverName
= 'tls.tests.dnsdist.org'
729 _dohBaseURL
= ("http://%s:%d/" % (_serverName
, _dohServerPort
))
730 _config_template
= """
731 newServer{address="127.0.0.1:%s"}
732 addDOHLocal("127.0.0.1:%s")
734 _config_params
= ['_testServerPort', '_dohServerPort']
736 def testDOHSimple(self
):
738 DOH over HTTP: Simple query
740 name
= 'simple.doh-over-http.tests.powerdns.com.'
741 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
743 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
744 response
= dns
.message
.make_response(query
)
745 rrset
= dns
.rrset
.from_text(name
,
750 response
.answer
.append(rrset
)
752 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
753 self
.assertTrue(receivedQuery
)
754 self
.assertTrue(receivedResponse
)
755 expectedQuery
.id = receivedQuery
.id
756 self
.assertEquals(expectedQuery
, receivedQuery
)
757 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
758 self
.assertEquals(response
, receivedResponse
)
759 self
.checkResponseNoEDNS(response
, receivedResponse
)
761 def testDOHSimplePOST(self
):
763 DOH over HTTP: Simple POST query
765 name
= 'simple-post.doh-over-http.tests.powerdns.com.'
766 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
768 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
770 response
= dns
.message
.make_response(query
)
771 rrset
= dns
.rrset
.from_text(name
,
776 response
.answer
.append(rrset
)
778 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
779 self
.assertTrue(receivedQuery
)
780 self
.assertTrue(receivedResponse
)
781 receivedQuery
.id = expectedQuery
.id
782 self
.assertEquals(expectedQuery
, receivedQuery
)
783 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
784 self
.assertEquals(response
, receivedResponse
)
785 self
.checkResponseNoEDNS(response
, receivedResponse
)
787 class TestDOHWithCache(DNSDistDOHTest
):
789 _serverKey
= 'server.key'
790 _serverCert
= 'server.chain'
791 _serverName
= 'tls.tests.dnsdist.org'
793 _dohServerPort
= 8443
794 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
795 _config_template
= """
796 newServer{address="127.0.0.1:%s"}
798 addDOHLocal("127.0.0.1:%s", "%s", "%s")
800 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
801 getPool(""):setCache(pc)
803 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
805 def testDOHCacheLargeAnswer(self
):
807 DOH with cache: Check that we can cache (and retrieve) large answers
810 name
= 'large.doh-with-cache.tests.powerdns.com.'
811 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
813 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
815 response
= dns
.message
.make_response(query
)
816 # we prepare a large answer
820 content
= content
+ ', '
821 content
= content
+ (str(i
)*50)
823 content
= content
+ 'A'*40
825 rrset
= dns
.rrset
.from_text(name
,
830 response
.answer
.append(rrset
)
831 self
.assertEquals(len(response
.to_wire()), 4096)
833 # first query to fill the cache
834 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
835 self
.assertTrue(receivedQuery
)
836 self
.assertTrue(receivedResponse
)
837 receivedQuery
.id = expectedQuery
.id
838 self
.assertEquals(expectedQuery
, receivedQuery
)
839 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
840 self
.assertEquals(response
, receivedResponse
)
842 for _
in range(numberOfQueries
):
843 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False)
844 self
.assertEquals(receivedResponse
, response
)