]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
12 class DNSDistDOHTest(DNSDistTest
):
15 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
19 wire
= query
.to_wire()
20 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
21 return baseurl
+ "?dns=" + param
24 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
26 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
28 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
33 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
34 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
35 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
36 response_headers
= BytesIO()
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn
.setopt(pycurl
.URL
, url
)
39 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
41 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
42 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
44 conn
.setopt(pycurl
.CAINFO
, caFile
)
46 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
47 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
50 cls
._toResponderQueue
.put(response
, True, timeout
)
54 cls
._response
_headers
= ''
55 data
= conn
.perform_rb()
56 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
57 if cls
._rcode
== 200 and not rawResponse
:
58 message
= dns
.message
.from_wire(data
)
62 if useQueue
and not cls
._fromResponderQueue
.empty():
63 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
65 cls
._response
_headers
= response_headers
.getvalue()
66 return (receivedQuery
, message
)
69 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[], useHTTPS
=True):
71 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
72 response_headers
= BytesIO()
73 #conn.setopt(pycurl.VERBOSE, True)
74 conn
.setopt(pycurl
.URL
, url
)
75 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
77 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
78 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
80 conn
.setopt(pycurl
.CAINFO
, caFile
)
82 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
83 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
84 conn
.setopt(pycurl
.POST
, True)
89 conn
.setopt(pycurl
.POSTFIELDS
, data
)
92 cls
._toResponderQueue
.put(response
, True, timeout
)
96 cls
._response
_headers
= ''
97 data
= conn
.perform_rb()
98 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
99 if cls
._rcode
== 200 and not rawResponse
:
100 message
= dns
.message
.from_wire(data
)
104 if useQueue
and not cls
._fromResponderQueue
.empty():
105 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
107 cls
._response
_headers
= response_headers
.getvalue()
108 return (receivedQuery
, message
)
111 # def openDOHConnection(cls, port, caFile, timeout=2.0):
112 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
113 # sslctx.load_verify_locations(caFile)
114 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
117 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
118 # url = cls.getDOHGetURL(baseurl, query)
121 # cls._toResponderQueue.put(response, True, timeout)
123 # conn.request('GET', url)
126 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
128 # data = conn.get_response()
132 # message = dns.message.from_wire(data)
134 # if useQueue and not cls._fromResponderQueue.empty():
135 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
136 # return (receivedQuery, message)
140 class TestDOH(DNSDistDOHTest
):
142 _serverKey
= 'server.key'
143 _serverCert
= 'server.chain'
144 _serverName
= 'tls.tests.dnsdist.org'
146 _dohServerPort
= 8443
147 _customResponseHeader1
= 'access-control-allow-origin: *'
148 _customResponseHeader2
= 'user-agent: derp'
149 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
150 _config_template
= """
151 newServer{address="127.0.0.1:%s"}
153 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp"}})
154 dohFE = getDOHFrontend(0)
155 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['foo']='bar'})})
157 addAction("drop.doh.tests.powerdns.com.", DropAction())
158 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
159 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
160 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
161 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
162 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
163 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
164 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
166 function dohHandler(dq)
167 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
168 local foundct = false
169 for key,value in pairs(dq:getHTTPHeaders()) do
170 if key == 'content-type' and value == 'application/dns-message' then
176 dq:setHTTPResponse(200, 'It works!', 'text/plain')
178 return DNSAction.HeaderModify
181 return DNSAction.None
183 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
185 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
187 def testDOHSimple(self
):
191 name
= 'simple.doh.tests.powerdns.com.'
192 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
194 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
196 response
= dns
.message
.make_response(query
)
197 rrset
= dns
.rrset
.from_text(name
,
202 response
.answer
.append(rrset
)
204 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
205 self
.assertTrue(receivedQuery
)
206 self
.assertTrue(receivedResponse
)
207 receivedQuery
.id = expectedQuery
.id
208 self
.assertEquals(expectedQuery
, receivedQuery
)
209 self
.assertTrue((self
._customResponseHeader
1) in self
._response
_headers
.decode())
210 self
.assertTrue((self
._customResponseHeader
2) in self
._response
_headers
.decode())
211 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
212 self
.assertEquals(response
, receivedResponse
)
214 def testDOHSimplePOST(self
):
216 DOH: Simple POST query
218 name
= 'simple-post.doh.tests.powerdns.com.'
219 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
221 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
223 response
= dns
.message
.make_response(query
)
224 rrset
= dns
.rrset
.from_text(name
,
229 response
.answer
.append(rrset
)
231 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
232 self
.assertTrue(receivedQuery
)
233 self
.assertTrue(receivedResponse
)
234 receivedQuery
.id = expectedQuery
.id
235 self
.assertEquals(expectedQuery
, receivedQuery
)
236 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
237 self
.assertEquals(response
, receivedResponse
)
239 def testDOHExistingEDNS(self
):
243 name
= 'existing-edns.doh.tests.powerdns.com.'
244 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
246 response
= dns
.message
.make_response(query
)
247 rrset
= dns
.rrset
.from_text(name
,
252 response
.answer
.append(rrset
)
254 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
255 self
.assertTrue(receivedQuery
)
256 self
.assertTrue(receivedResponse
)
257 receivedQuery
.id = query
.id
258 self
.assertEquals(query
, receivedQuery
)
259 self
.assertEquals(response
, receivedResponse
)
260 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
261 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
263 def testDOHExistingECS(self
):
265 DOH: Existing EDNS Client Subnet
267 name
= 'existing-ecs.doh.tests.powerdns.com.'
268 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
269 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
270 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
272 response
= dns
.message
.make_response(query
)
273 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
274 rrset
= dns
.rrset
.from_text(name
,
279 response
.answer
.append(rrset
)
281 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
282 self
.assertTrue(receivedQuery
)
283 self
.assertTrue(receivedResponse
)
284 receivedQuery
.id = query
.id
285 self
.assertEquals(query
, receivedQuery
)
286 self
.assertEquals(response
, receivedResponse
)
287 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
288 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
290 def testDropped(self
):
294 name
= 'drop.doh.tests.powerdns.com.'
295 query
= dns
.message
.make_query(name
, 'A', 'IN')
296 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
297 self
.assertEquals(receivedResponse
, None)
299 def testRefused(self
):
303 name
= 'refused.doh.tests.powerdns.com.'
304 query
= dns
.message
.make_query(name
, 'A', 'IN')
306 expectedResponse
= dns
.message
.make_response(query
)
307 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
309 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
310 self
.assertEquals(receivedResponse
, expectedResponse
)
316 name
= 'spoof.doh.tests.powerdns.com.'
317 query
= dns
.message
.make_query(name
, 'A', 'IN')
319 query
.flags
&= ~dns
.flags
.RD
320 expectedResponse
= dns
.message
.make_response(query
)
321 rrset
= dns
.rrset
.from_text(name
,
326 expectedResponse
.answer
.append(rrset
)
328 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
329 self
.assertEquals(receivedResponse
, expectedResponse
)
331 def testDOHInvalid(self
):
335 name
= 'invalid.doh.tests.powerdns.com.'
336 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
338 # first an invalid query
339 invalidQuery
= invalidQuery
.to_wire()
340 invalidQuery
= invalidQuery
[:-5]
341 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
342 self
.assertEquals(receivedResponse
, None)
344 # and now a valid one
345 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
347 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
349 response
= dns
.message
.make_response(query
)
350 rrset
= dns
.rrset
.from_text(name
,
355 response
.answer
.append(rrset
)
356 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
357 self
.assertTrue(receivedQuery
)
358 self
.assertTrue(receivedResponse
)
359 receivedQuery
.id = expectedQuery
.id
360 self
.assertEquals(expectedQuery
, receivedQuery
)
361 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
362 self
.assertEquals(response
, receivedResponse
)
364 def testDOHWithoutQuery(self
):
368 name
= 'empty-get.doh.tests.powerdns.com.'
369 url
= self
._dohBaseURL
370 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
371 conn
.setopt(pycurl
.URL
, url
)
372 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
373 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
374 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
375 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
376 data
= conn
.perform_rb()
377 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
378 self
.assertEquals(rcode
, 400)
380 def testDOHEmptyPOST(self
):
382 DOH: Empty POST query
384 name
= 'empty-post.doh.tests.powerdns.com.'
386 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
387 self
.assertEquals(receivedResponse
, None)
389 # and now a valid one
390 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
392 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
394 response
= dns
.message
.make_response(query
)
395 rrset
= dns
.rrset
.from_text(name
,
400 response
.answer
.append(rrset
)
401 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
402 self
.assertTrue(receivedQuery
)
403 self
.assertTrue(receivedResponse
)
404 receivedQuery
.id = expectedQuery
.id
405 self
.assertEquals(expectedQuery
, receivedQuery
)
406 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
407 self
.assertEquals(response
, receivedResponse
)
409 def testHeaderRule(self
):
413 name
= 'header-rule.doh.tests.powerdns.com.'
414 query
= dns
.message
.make_query(name
, 'A', 'IN')
416 query
.flags
&= ~dns
.flags
.RD
417 expectedResponse
= dns
.message
.make_response(query
)
418 rrset
= dns
.rrset
.from_text(name
,
423 expectedResponse
.answer
.append(rrset
)
425 # this header should match
426 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
427 self
.assertEquals(receivedResponse
, expectedResponse
)
429 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
430 expectedQuery
.flags
&= ~dns
.flags
.RD
432 response
= dns
.message
.make_response(query
)
433 rrset
= dns
.rrset
.from_text(name
,
438 response
.answer
.append(rrset
)
440 # this content of the header should NOT match
441 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
442 self
.assertTrue(receivedQuery
)
443 self
.assertTrue(receivedResponse
)
444 receivedQuery
.id = expectedQuery
.id
445 self
.assertEquals(expectedQuery
, receivedQuery
)
446 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
447 self
.assertEquals(response
, receivedResponse
)
449 def testHTTPPath(self
):
453 name
= 'http-path.doh.tests.powerdns.com.'
454 query
= dns
.message
.make_query(name
, 'A', 'IN')
456 query
.flags
&= ~dns
.flags
.RD
457 expectedResponse
= dns
.message
.make_response(query
)
458 rrset
= dns
.rrset
.from_text(name
,
463 expectedResponse
.answer
.append(rrset
)
465 # this path should match
466 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
467 self
.assertEquals(receivedResponse
, expectedResponse
)
469 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
471 expectedQuery
.flags
&= ~dns
.flags
.RD
472 response
= dns
.message
.make_response(query
)
473 rrset
= dns
.rrset
.from_text(name
,
478 response
.answer
.append(rrset
)
480 # this path should NOT match
481 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
482 self
.assertTrue(receivedQuery
)
483 self
.assertTrue(receivedResponse
)
484 receivedQuery
.id = expectedQuery
.id
485 self
.assertEquals(expectedQuery
, receivedQuery
)
486 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
487 self
.assertEquals(response
, receivedResponse
)
489 def testHTTPPathRegex(self
):
493 name
= 'http-path-regex.doh.tests.powerdns.com.'
494 query
= dns
.message
.make_query(name
, 'A', 'IN')
496 query
.flags
&= ~dns
.flags
.RD
497 expectedResponse
= dns
.message
.make_response(query
)
498 rrset
= dns
.rrset
.from_text(name
,
503 expectedResponse
.answer
.append(rrset
)
505 # this path should match
506 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS-999', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
507 self
.assertEquals(receivedResponse
, expectedResponse
)
509 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
511 expectedQuery
.flags
&= ~dns
.flags
.RD
512 response
= dns
.message
.make_response(query
)
513 rrset
= dns
.rrset
.from_text(name
,
518 response
.answer
.append(rrset
)
520 # this path should NOT match
521 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
522 self
.assertTrue(receivedQuery
)
523 self
.assertTrue(receivedResponse
)
524 receivedQuery
.id = expectedQuery
.id
525 self
.assertEquals(expectedQuery
, receivedQuery
)
526 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
527 self
.assertEquals(response
, receivedResponse
)
529 def testHTTPStatusAction200(self
):
531 DOH: HTTPStatusAction 200 OK
533 name
= 'http-status-action.doh.tests.powerdns.com.'
534 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
537 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
538 self
.assertTrue(receivedResponse
)
539 self
.assertEquals(receivedResponse
, b
'Plaintext answer')
540 self
.assertEquals(self
._rcode
, 200)
541 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
543 def testHTTPStatusAction307(self
):
545 DOH: HTTPStatusAction 307
547 name
= 'http-status-action-redirect.doh.tests.powerdns.com.'
548 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
551 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
552 self
.assertTrue(receivedResponse
)
553 self
.assertEquals(self
._rcode
, 307)
554 self
.assertTrue('location: https://doh.powerdns.org' in self
._response
_headers
.decode())
556 def testHTTPLuaResponse(self
):
558 DOH: Lua HTTP Response
560 name
= 'http-lua.doh.tests.powerdns.com.'
561 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
564 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
565 self
.assertTrue(receivedResponse
)
566 self
.assertEquals(receivedResponse
, b
'It works!')
567 self
.assertEquals(self
._rcode
, 200)
568 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
570 def testHTTPEarlyResponse(self
):
572 DOH: HTTP Early Response
574 response_headers
= BytesIO()
575 url
= self
._dohBaseURL
+ 'coffee'
576 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
577 conn
.setopt(pycurl
.URL
, url
)
578 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
579 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
580 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
581 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
582 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
583 data
= conn
.perform_rb()
584 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
585 headers
= response_headers
.getvalue().decode()
587 self
.assertEquals(rcode
, 418)
588 self
.assertEquals(data
, b
'C0FFEE')
589 self
.assertIn('foo: bar', headers
)
590 self
.assertNotIn(self
._customResponseHeader
2, headers
)
592 response_headers
= BytesIO()
593 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
594 conn
.setopt(pycurl
.URL
, url
)
595 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
596 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
597 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
598 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
599 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
600 conn
.setopt(pycurl
.POST
, True)
602 conn
.setopt(pycurl
.POSTFIELDS
, data
)
604 data
= conn
.perform_rb()
605 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
606 headers
= response_headers
.getvalue().decode()
607 self
.assertEquals(rcode
, 418)
608 self
.assertEquals(data
, b
'C0FFEE')
609 self
.assertIn('foo: bar', headers
)
610 self
.assertNotIn(self
._customResponseHeader
2, headers
)
612 class TestDOHAddingECS(DNSDistDOHTest
):
614 _serverKey
= 'server.key'
615 _serverCert
= 'server.chain'
616 _serverName
= 'tls.tests.dnsdist.org'
618 _dohServerPort
= 8443
619 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
620 _config_template
= """
621 newServer{address="127.0.0.1:%s", useClientSubnet=true}
622 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
625 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
627 def testDOHSimple(self
):
629 DOH with ECS: Simple query
631 name
= 'simple.doh-ecs.tests.powerdns.com.'
632 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
634 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
635 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
636 response
= dns
.message
.make_response(query
)
637 rrset
= dns
.rrset
.from_text(name
,
642 response
.answer
.append(rrset
)
644 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
645 self
.assertTrue(receivedQuery
)
646 self
.assertTrue(receivedResponse
)
647 expectedQuery
.id = receivedQuery
.id
648 self
.assertEquals(expectedQuery
, receivedQuery
)
649 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
650 self
.assertEquals(response
, receivedResponse
)
651 self
.checkResponseNoEDNS(response
, receivedResponse
)
653 def testDOHExistingEDNS(self
):
655 DOH with ECS: Existing EDNS
657 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
658 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
660 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
661 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
662 response
= dns
.message
.make_response(query
)
663 rrset
= dns
.rrset
.from_text(name
,
668 response
.answer
.append(rrset
)
670 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
671 self
.assertTrue(receivedQuery
)
672 self
.assertTrue(receivedResponse
)
673 receivedQuery
.id = expectedQuery
.id
674 self
.assertEquals(expectedQuery
, receivedQuery
)
675 self
.assertEquals(response
, receivedResponse
)
676 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
677 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
679 def testDOHExistingECS(self
):
681 DOH with ECS: Existing EDNS Client Subnet
683 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
684 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
685 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
686 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
688 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
689 response
= dns
.message
.make_response(query
)
690 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
691 rrset
= dns
.rrset
.from_text(name
,
696 response
.answer
.append(rrset
)
698 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
699 self
.assertTrue(receivedQuery
)
700 self
.assertTrue(receivedResponse
)
701 receivedQuery
.id = expectedQuery
.id
702 self
.assertEquals(expectedQuery
, receivedQuery
)
703 self
.assertEquals(response
, receivedResponse
)
704 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
705 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
707 class TestDOHOverHTTP(DNSDistDOHTest
):
709 _dohServerPort
= 8480
710 _serverName
= 'tls.tests.dnsdist.org'
711 _dohBaseURL
= ("http://%s:%d/" % (_serverName
, _dohServerPort
))
712 _config_template
= """
713 newServer{address="127.0.0.1:%s"}
714 addDOHLocal("127.0.0.1:%s")
716 _config_params
= ['_testServerPort', '_dohServerPort']
718 def testDOHSimple(self
):
720 DOH over HTTP: Simple query
722 name
= 'simple.doh-over-http.tests.powerdns.com.'
723 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
725 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
726 response
= dns
.message
.make_response(query
)
727 rrset
= dns
.rrset
.from_text(name
,
732 response
.answer
.append(rrset
)
734 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
735 self
.assertTrue(receivedQuery
)
736 self
.assertTrue(receivedResponse
)
737 expectedQuery
.id = receivedQuery
.id
738 self
.assertEquals(expectedQuery
, receivedQuery
)
739 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
740 self
.assertEquals(response
, receivedResponse
)
741 self
.checkResponseNoEDNS(response
, receivedResponse
)
743 def testDOHSimplePOST(self
):
745 DOH over HTTP: Simple POST query
747 name
= 'simple-post.doh-over-http.tests.powerdns.com.'
748 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
750 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
752 response
= dns
.message
.make_response(query
)
753 rrset
= dns
.rrset
.from_text(name
,
758 response
.answer
.append(rrset
)
760 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, useHTTPS
=False)
761 self
.assertTrue(receivedQuery
)
762 self
.assertTrue(receivedResponse
)
763 receivedQuery
.id = expectedQuery
.id
764 self
.assertEquals(expectedQuery
, receivedQuery
)
765 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
766 self
.assertEquals(response
, receivedResponse
)
767 self
.checkResponseNoEDNS(response
, receivedResponse
)