]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
12 class DNSDistDOHTest(DNSDistTest
):
15 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
19 wire
= query
.to_wire()
20 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
21 return baseurl
+ "?dns=" + param
24 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
26 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
28 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
33 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[]):
34 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
35 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
36 response_headers
= BytesIO()
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn
.setopt(pycurl
.URL
, url
)
39 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
40 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
41 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
42 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
43 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
45 conn
.setopt(pycurl
.CAINFO
, caFile
)
48 cls
._toResponderQueue
.put(response
, True, timeout
)
52 cls
._response
_headers
= ''
53 data
= conn
.perform_rb()
54 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
55 if cls
._rcode
== 200 and not rawResponse
:
56 message
= dns
.message
.from_wire(data
)
60 if useQueue
and not cls
._fromResponderQueue
.empty():
61 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
63 cls
._response
_headers
= response_headers
.getvalue()
64 return (receivedQuery
, message
)
67 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, rawResponse
=False, customHeaders
=[]):
69 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
70 response_headers
= BytesIO()
71 #conn.setopt(pycurl.VERBOSE, True)
72 conn
.setopt(pycurl
.URL
, url
)
73 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
74 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
75 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
76 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
77 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
78 conn
.setopt(pycurl
.POST
, True)
83 conn
.setopt(pycurl
.POSTFIELDS
, data
)
86 conn
.setopt(pycurl
.CAINFO
, caFile
)
89 cls
._toResponderQueue
.put(response
, True, timeout
)
93 cls
._response
_headers
= ''
94 data
= conn
.perform_rb()
95 cls
._rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
96 if cls
._rcode
== 200 and not rawResponse
:
97 message
= dns
.message
.from_wire(data
)
101 if useQueue
and not cls
._fromResponderQueue
.empty():
102 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
104 cls
._response
_headers
= response_headers
.getvalue()
105 return (receivedQuery
, message
)
108 # def openDOHConnection(cls, port, caFile, timeout=2.0):
109 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
110 # sslctx.load_verify_locations(caFile)
111 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
114 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
115 # url = cls.getDOHGetURL(baseurl, query)
118 # cls._toResponderQueue.put(response, True, timeout)
120 # conn.request('GET', url)
123 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
125 # data = conn.get_response()
129 # message = dns.message.from_wire(data)
131 # if useQueue and not cls._fromResponderQueue.empty():
132 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
133 # return (receivedQuery, message)
137 class TestDOH(DNSDistDOHTest
):
139 _serverKey
= 'server.key'
140 _serverCert
= 'server.chain'
141 _serverName
= 'tls.tests.dnsdist.org'
143 _dohServerPort
= 8443
144 _customResponseHeader1
= 'access-control-allow-origin: *'
145 _customResponseHeader2
= 'user-agent: derp'
146 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
147 _config_template
= """
148 newServer{address="127.0.0.1:%s"}
150 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp"}})
151 dohFE = getDOHFrontend(0)
152 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['foo']='bar'})})
154 addAction("drop.doh.tests.powerdns.com.", DropAction())
155 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
156 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
157 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
158 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
159 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
160 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
161 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
163 function dohHandler(dq)
164 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
165 local foundct = false
166 for key,value in pairs(dq:getHTTPHeaders()) do
167 if key == 'content-type' and value == 'application/dns-message' then
173 dq:setHTTPResponse(200, 'It works!', 'text/plain')
175 return DNSAction.HeaderModify
178 return DNSAction.None
180 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
182 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
184 def testDOHSimple(self
):
188 name
= 'simple.doh.tests.powerdns.com.'
189 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
191 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
193 response
= dns
.message
.make_response(query
)
194 rrset
= dns
.rrset
.from_text(name
,
199 response
.answer
.append(rrset
)
201 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
202 self
.assertTrue(receivedQuery
)
203 self
.assertTrue(receivedResponse
)
204 receivedQuery
.id = expectedQuery
.id
205 self
.assertEquals(expectedQuery
, receivedQuery
)
206 self
.assertTrue((self
._customResponseHeader
1) in self
._response
_headers
.decode())
207 self
.assertTrue((self
._customResponseHeader
2) in self
._response
_headers
.decode())
208 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
209 self
.assertEquals(response
, receivedResponse
)
211 def testDOHSimplePOST(self
):
213 DOH: Simple POST query
215 name
= 'simple-post.doh.tests.powerdns.com.'
216 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
218 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
220 response
= dns
.message
.make_response(query
)
221 rrset
= dns
.rrset
.from_text(name
,
226 response
.answer
.append(rrset
)
228 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
229 self
.assertTrue(receivedQuery
)
230 self
.assertTrue(receivedResponse
)
231 receivedQuery
.id = expectedQuery
.id
232 self
.assertEquals(expectedQuery
, receivedQuery
)
233 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
234 self
.assertEquals(response
, receivedResponse
)
236 def testDOHExistingEDNS(self
):
240 name
= 'existing-edns.doh.tests.powerdns.com.'
241 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
243 response
= dns
.message
.make_response(query
)
244 rrset
= dns
.rrset
.from_text(name
,
249 response
.answer
.append(rrset
)
251 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
252 self
.assertTrue(receivedQuery
)
253 self
.assertTrue(receivedResponse
)
254 receivedQuery
.id = query
.id
255 self
.assertEquals(query
, receivedQuery
)
256 self
.assertEquals(response
, receivedResponse
)
257 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
258 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
260 def testDOHExistingECS(self
):
262 DOH: Existing EDNS Client Subnet
264 name
= 'existing-ecs.doh.tests.powerdns.com.'
265 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
266 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
267 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
269 response
= dns
.message
.make_response(query
)
270 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
271 rrset
= dns
.rrset
.from_text(name
,
276 response
.answer
.append(rrset
)
278 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
279 self
.assertTrue(receivedQuery
)
280 self
.assertTrue(receivedResponse
)
281 receivedQuery
.id = query
.id
282 self
.assertEquals(query
, receivedQuery
)
283 self
.assertEquals(response
, receivedResponse
)
284 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
285 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
287 def testDropped(self
):
291 name
= 'drop.doh.tests.powerdns.com.'
292 query
= dns
.message
.make_query(name
, 'A', 'IN')
293 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
294 self
.assertEquals(receivedResponse
, None)
296 def testRefused(self
):
300 name
= 'refused.doh.tests.powerdns.com.'
301 query
= dns
.message
.make_query(name
, 'A', 'IN')
303 expectedResponse
= dns
.message
.make_response(query
)
304 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
306 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
307 self
.assertEquals(receivedResponse
, expectedResponse
)
313 name
= 'spoof.doh.tests.powerdns.com.'
314 query
= dns
.message
.make_query(name
, 'A', 'IN')
316 query
.flags
&= ~dns
.flags
.RD
317 expectedResponse
= dns
.message
.make_response(query
)
318 rrset
= dns
.rrset
.from_text(name
,
323 expectedResponse
.answer
.append(rrset
)
325 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
326 self
.assertEquals(receivedResponse
, expectedResponse
)
328 def testDOHInvalid(self
):
332 name
= 'invalid.doh.tests.powerdns.com.'
333 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
335 # first an invalid query
336 invalidQuery
= invalidQuery
.to_wire()
337 invalidQuery
= invalidQuery
[:-5]
338 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
339 self
.assertEquals(receivedResponse
, None)
341 # and now a valid one
342 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
344 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
346 response
= dns
.message
.make_response(query
)
347 rrset
= dns
.rrset
.from_text(name
,
352 response
.answer
.append(rrset
)
353 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
354 self
.assertTrue(receivedQuery
)
355 self
.assertTrue(receivedResponse
)
356 receivedQuery
.id = expectedQuery
.id
357 self
.assertEquals(expectedQuery
, receivedQuery
)
358 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
359 self
.assertEquals(response
, receivedResponse
)
361 def testDOHWithoutQuery(self
):
365 name
= 'empty-get.doh.tests.powerdns.com.'
366 url
= self
._dohBaseURL
367 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
368 conn
.setopt(pycurl
.URL
, url
)
369 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
370 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
371 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
372 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
373 data
= conn
.perform_rb()
374 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
375 self
.assertEquals(rcode
, 400)
377 def testDOHEmptyPOST(self
):
379 DOH: Empty POST query
381 name
= 'empty-post.doh.tests.powerdns.com.'
383 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
384 self
.assertEquals(receivedResponse
, None)
386 # and now a valid one
387 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
389 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
391 response
= dns
.message
.make_response(query
)
392 rrset
= dns
.rrset
.from_text(name
,
397 response
.answer
.append(rrset
)
398 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
399 self
.assertTrue(receivedQuery
)
400 self
.assertTrue(receivedResponse
)
401 receivedQuery
.id = expectedQuery
.id
402 self
.assertEquals(expectedQuery
, receivedQuery
)
403 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
404 self
.assertEquals(response
, receivedResponse
)
406 def testHeaderRule(self
):
410 name
= 'header-rule.doh.tests.powerdns.com.'
411 query
= dns
.message
.make_query(name
, 'A', 'IN')
413 query
.flags
&= ~dns
.flags
.RD
414 expectedResponse
= dns
.message
.make_response(query
)
415 rrset
= dns
.rrset
.from_text(name
,
420 expectedResponse
.answer
.append(rrset
)
422 # this header should match
423 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
424 self
.assertEquals(receivedResponse
, expectedResponse
)
426 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
427 expectedQuery
.flags
&= ~dns
.flags
.RD
429 response
= dns
.message
.make_response(query
)
430 rrset
= dns
.rrset
.from_text(name
,
435 response
.answer
.append(rrset
)
437 # this content of the header should NOT match
438 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
439 self
.assertTrue(receivedQuery
)
440 self
.assertTrue(receivedResponse
)
441 receivedQuery
.id = expectedQuery
.id
442 self
.assertEquals(expectedQuery
, receivedQuery
)
443 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
444 self
.assertEquals(response
, receivedResponse
)
446 def testHTTPPath(self
):
450 name
= 'http-path.doh.tests.powerdns.com.'
451 query
= dns
.message
.make_query(name
, 'A', 'IN')
453 query
.flags
&= ~dns
.flags
.RD
454 expectedResponse
= dns
.message
.make_response(query
)
455 rrset
= dns
.rrset
.from_text(name
,
460 expectedResponse
.answer
.append(rrset
)
462 # this path should match
463 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
464 self
.assertEquals(receivedResponse
, expectedResponse
)
466 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
468 expectedQuery
.flags
&= ~dns
.flags
.RD
469 response
= dns
.message
.make_response(query
)
470 rrset
= dns
.rrset
.from_text(name
,
475 response
.answer
.append(rrset
)
477 # this path should NOT match
478 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
479 self
.assertTrue(receivedQuery
)
480 self
.assertTrue(receivedResponse
)
481 receivedQuery
.id = expectedQuery
.id
482 self
.assertEquals(expectedQuery
, receivedQuery
)
483 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
484 self
.assertEquals(response
, receivedResponse
)
486 def testHTTPPathRegex(self
):
490 name
= 'http-path-regex.doh.tests.powerdns.com.'
491 query
= dns
.message
.make_query(name
, 'A', 'IN')
493 query
.flags
&= ~dns
.flags
.RD
494 expectedResponse
= dns
.message
.make_response(query
)
495 rrset
= dns
.rrset
.from_text(name
,
500 expectedResponse
.answer
.append(rrset
)
502 # this path should match
503 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS-999', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
504 self
.assertEquals(receivedResponse
, expectedResponse
)
506 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
508 expectedQuery
.flags
&= ~dns
.flags
.RD
509 response
= dns
.message
.make_response(query
)
510 rrset
= dns
.rrset
.from_text(name
,
515 response
.answer
.append(rrset
)
517 # this path should NOT match
518 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
519 self
.assertTrue(receivedQuery
)
520 self
.assertTrue(receivedResponse
)
521 receivedQuery
.id = expectedQuery
.id
522 self
.assertEquals(expectedQuery
, receivedQuery
)
523 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
524 self
.assertEquals(response
, receivedResponse
)
526 def testHTTPStatusAction200(self
):
528 DOH: HTTPStatusAction 200 OK
530 name
= 'http-status-action.doh.tests.powerdns.com.'
531 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
534 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
535 self
.assertTrue(receivedResponse
)
536 self
.assertEquals(receivedResponse
, b
'Plaintext answer')
537 self
.assertEquals(self
._rcode
, 200)
538 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
540 def testHTTPStatusAction307(self
):
542 DOH: HTTPStatusAction 307
544 name
= 'http-status-action-redirect.doh.tests.powerdns.com.'
545 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
548 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
549 self
.assertTrue(receivedResponse
)
550 self
.assertEquals(self
._rcode
, 307)
551 self
.assertTrue('location: https://doh.powerdns.org' in self
._response
_headers
.decode())
553 def testHTTPLuaResponse(self
):
555 DOH: Lua HTTP Response
557 name
= 'http-lua.doh.tests.powerdns.com.'
558 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
561 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, caFile
=self
._caCert
, useQueue
=False, rawResponse
=True)
562 self
.assertTrue(receivedResponse
)
563 self
.assertEquals(receivedResponse
, b
'It works!')
564 self
.assertEquals(self
._rcode
, 200)
565 self
.assertTrue('content-type: text/plain' in self
._response
_headers
.decode())
567 def testHTTPEarlyResponse(self
):
569 DOH: HTTP Early Response
571 response_headers
= BytesIO()
572 url
= self
._dohBaseURL
+ 'coffee'
573 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
574 conn
.setopt(pycurl
.URL
, url
)
575 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
576 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
577 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
578 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
579 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
580 data
= conn
.perform_rb()
581 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
582 headers
= response_headers
.getvalue().decode()
584 self
.assertEquals(rcode
, 418)
585 self
.assertEquals(data
, b
'C0FFEE')
586 self
.assertIn('foo: bar', headers
)
587 self
.assertNotIn(self
._customResponseHeader
2, headers
)
589 response_headers
= BytesIO()
590 conn
= self
.openDOHConnection(self
._dohServerPort
, caFile
=self
._caCert
, timeout
=2.0)
591 conn
.setopt(pycurl
.URL
, url
)
592 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
593 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
594 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
595 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
596 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
597 conn
.setopt(pycurl
.POST
, True)
599 conn
.setopt(pycurl
.POSTFIELDS
, data
)
601 data
= conn
.perform_rb()
602 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
603 headers
= response_headers
.getvalue().decode()
604 self
.assertEquals(rcode
, 418)
605 self
.assertEquals(data
, b
'C0FFEE')
606 self
.assertIn('foo: bar', headers
)
607 self
.assertNotIn(self
._customResponseHeader
2, headers
)
609 class TestDOHAddingECS(DNSDistDOHTest
):
611 _serverKey
= 'server.key'
612 _serverCert
= 'server.chain'
613 _serverName
= 'tls.tests.dnsdist.org'
615 _dohServerPort
= 8443
616 _serverName
= 'tls.tests.dnsdist.org'
617 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
618 _config_template
= """
619 newServer{address="127.0.0.1:%s", useClientSubnet=true}
620 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
623 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
625 def testDOHSimple(self
):
627 DOH with ECS: Simple query
629 name
= 'simple.doh-ecs.tests.powerdns.com.'
630 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
632 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
633 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
634 response
= dns
.message
.make_response(query
)
635 rrset
= dns
.rrset
.from_text(name
,
640 response
.answer
.append(rrset
)
642 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
643 self
.assertTrue(receivedQuery
)
644 self
.assertTrue(receivedResponse
)
645 expectedQuery
.id = receivedQuery
.id
646 self
.assertEquals(expectedQuery
, receivedQuery
)
647 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
648 self
.assertEquals(response
, receivedResponse
)
649 self
.checkResponseNoEDNS(response
, receivedResponse
)
651 def testDOHExistingEDNS(self
):
653 DOH with ECS: Existing EDNS
655 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
656 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
658 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
659 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
660 response
= dns
.message
.make_response(query
)
661 rrset
= dns
.rrset
.from_text(name
,
666 response
.answer
.append(rrset
)
668 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
669 self
.assertTrue(receivedQuery
)
670 self
.assertTrue(receivedResponse
)
671 receivedQuery
.id = expectedQuery
.id
672 self
.assertEquals(expectedQuery
, receivedQuery
)
673 self
.assertEquals(response
, receivedResponse
)
674 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
675 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
677 def testDOHExistingECS(self
):
679 DOH with ECS: Existing EDNS Client Subnet
681 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
682 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
683 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
684 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
686 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
687 response
= dns
.message
.make_response(query
)
688 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
689 rrset
= dns
.rrset
.from_text(name
,
694 response
.answer
.append(rrset
)
696 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
697 self
.assertTrue(receivedQuery
)
698 self
.assertTrue(receivedResponse
)
699 receivedQuery
.id = expectedQuery
.id
700 self
.assertEquals(expectedQuery
, receivedQuery
)
701 self
.assertEquals(response
, receivedResponse
)
702 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
703 self
.checkResponseEDNSWithECS(response
, receivedResponse
)