]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
12 class DNSDistDOHTest(DNSDistTest
):
15 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
19 wire
= query
.to_wire()
20 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
21 return baseurl
+ "?dns=" + param
24 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
26 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
28 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
33 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False, customHeaders
=[]):
34 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
35 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
36 response_headers
= BytesIO()
37 #conn.setopt(pycurl.VERBOSE, True)
38 conn
.setopt(pycurl
.URL
, url
)
39 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
40 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
41 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
42 conn
.setopt(pycurl
.HTTPHEADER
, customHeaders
)
43 conn
.setopt(pycurl
.HEADERFUNCTION
, response_headers
.write
)
45 conn
.setopt(pycurl
.CAINFO
, caFile
)
48 cls
._toResponderQueue
.put(response
, True, timeout
)
52 cls
._response
_headers
= ''
53 data
= conn
.perform_rb()
54 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
56 message
= dns
.message
.from_wire(data
)
58 if useQueue
and not cls
._fromResponderQueue
.empty():
59 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
61 cls
._response
_headers
= response_headers
.getvalue()
62 return (receivedQuery
, message
)
65 def sendDOHPostQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False):
67 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
68 #conn.setopt(pycurl.VERBOSE, True)
69 conn
.setopt(pycurl
.URL
, url
)
70 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
71 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
72 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
73 conn
.setopt(pycurl
.POST
, True)
78 conn
.setopt(pycurl
.POSTFIELDS
, data
)
81 conn
.setopt(pycurl
.CAINFO
, caFile
)
84 cls
._toResponderQueue
.put(response
, True, timeout
)
88 data
= conn
.perform_rb()
89 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
91 message
= dns
.message
.from_wire(data
)
93 if useQueue
and not cls
._fromResponderQueue
.empty():
94 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
96 return (receivedQuery
, message
)
99 # def openDOHConnection(cls, port, caFile, timeout=2.0):
100 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
101 # sslctx.load_verify_locations(caFile)
102 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
105 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
106 # url = cls.getDOHGetURL(baseurl, query)
109 # cls._toResponderQueue.put(response, True, timeout)
111 # conn.request('GET', url)
114 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
116 # data = conn.get_response()
120 # message = dns.message.from_wire(data)
122 # if useQueue and not cls._fromResponderQueue.empty():
123 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
124 # return (receivedQuery, message)
128 class TestDOH(DNSDistDOHTest
):
130 _serverKey
= 'server.key'
131 _serverCert
= 'server.chain'
132 _serverName
= 'tls.tests.dnsdist.org'
134 _dohServerPort
= 8443
135 _customResponseHeader1
= 'access-control-allow-origin: *'
136 _customResponseHeader2
= 'user-agent: derp'
137 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
138 _config_template
= """
139 newServer{address="127.0.0.1:%s"}
141 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp"}})
143 addAction("drop.doh.tests.powerdns.com.", DropAction())
144 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
145 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
146 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
147 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
149 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
151 def testDOHSimple(self
):
155 name
= 'simple.doh.tests.powerdns.com.'
156 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
158 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
160 response
= dns
.message
.make_response(query
)
161 rrset
= dns
.rrset
.from_text(name
,
166 response
.answer
.append(rrset
)
168 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
169 self
.assertTrue(receivedQuery
)
170 self
.assertTrue(receivedResponse
)
171 receivedQuery
.id = expectedQuery
.id
172 self
.assertEquals(expectedQuery
, receivedQuery
)
173 self
.assertTrue((self
._customResponseHeader
1) in self
._response
_headers
.decode())
174 self
.assertTrue((self
._customResponseHeader
2) in self
._response
_headers
.decode())
175 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
176 self
.assertEquals(response
, receivedResponse
)
178 def testDOHSimplePOST(self
):
180 DOH: Simple POST query
182 name
= 'simple-post.doh.tests.powerdns.com.'
183 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
185 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
187 response
= dns
.message
.make_response(query
)
188 rrset
= dns
.rrset
.from_text(name
,
193 response
.answer
.append(rrset
)
195 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
196 self
.assertTrue(receivedQuery
)
197 self
.assertTrue(receivedResponse
)
198 receivedQuery
.id = expectedQuery
.id
199 self
.assertEquals(expectedQuery
, receivedQuery
)
200 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
201 self
.assertEquals(response
, receivedResponse
)
203 def testDOHExistingEDNS(self
):
207 name
= 'existing-edns.doh.tests.powerdns.com.'
208 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
210 response
= dns
.message
.make_response(query
)
211 rrset
= dns
.rrset
.from_text(name
,
216 response
.answer
.append(rrset
)
218 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
219 self
.assertTrue(receivedQuery
)
220 self
.assertTrue(receivedResponse
)
221 receivedQuery
.id = query
.id
222 self
.assertEquals(query
, receivedQuery
)
223 self
.assertEquals(response
, receivedResponse
)
224 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
225 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
227 def testDOHExistingECS(self
):
229 DOH: Existing EDNS Client Subnet
231 name
= 'existing-ecs.doh.tests.powerdns.com.'
232 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
233 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
234 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
236 response
= dns
.message
.make_response(query
)
237 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
238 rrset
= dns
.rrset
.from_text(name
,
243 response
.answer
.append(rrset
)
245 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
246 self
.assertTrue(receivedQuery
)
247 self
.assertTrue(receivedResponse
)
248 receivedQuery
.id = query
.id
249 self
.assertEquals(query
, receivedQuery
)
250 self
.assertEquals(response
, receivedResponse
)
251 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
252 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
254 def testDropped(self
):
258 name
= 'drop.doh.tests.powerdns.com.'
259 query
= dns
.message
.make_query(name
, 'A', 'IN')
260 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
261 self
.assertEquals(receivedResponse
, None)
263 def testRefused(self
):
267 name
= 'refused.doh.tests.powerdns.com.'
268 query
= dns
.message
.make_query(name
, 'A', 'IN')
270 expectedResponse
= dns
.message
.make_response(query
)
271 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
273 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
274 self
.assertEquals(receivedResponse
, expectedResponse
)
280 name
= 'spoof.doh.tests.powerdns.com.'
281 query
= dns
.message
.make_query(name
, 'A', 'IN')
283 query
.flags
&= ~dns
.flags
.RD
284 expectedResponse
= dns
.message
.make_response(query
)
285 rrset
= dns
.rrset
.from_text(name
,
290 expectedResponse
.answer
.append(rrset
)
292 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
293 self
.assertEquals(receivedResponse
, expectedResponse
)
295 def testDOHInvalid(self
):
299 name
= 'invalid.doh.tests.powerdns.com.'
300 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
302 # first an invalid query
303 invalidQuery
= invalidQuery
.to_wire()
304 invalidQuery
= invalidQuery
[:-5]
305 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
306 self
.assertEquals(receivedResponse
, None)
308 # and now a valid one
309 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
311 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
313 response
= dns
.message
.make_response(query
)
314 rrset
= dns
.rrset
.from_text(name
,
319 response
.answer
.append(rrset
)
320 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
321 self
.assertTrue(receivedQuery
)
322 self
.assertTrue(receivedResponse
)
323 receivedQuery
.id = expectedQuery
.id
324 self
.assertEquals(expectedQuery
, receivedQuery
)
325 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
326 self
.assertEquals(response
, receivedResponse
)
328 def testDOHWithoutQuery(self
):
332 name
= 'empty-get.doh.tests.powerdns.com.'
333 url
= self
._dohBaseURL
334 conn
= self
.openDOHConnection(self
._dohServerPort
, self
._caCert
, timeout
=2.0)
335 conn
.setopt(pycurl
.URL
, url
)
336 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (self
._serverName
, self
._dohServerPort
)])
337 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
338 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
339 conn
.setopt(pycurl
.CAINFO
, self
._caCert
)
340 data
= conn
.perform_rb()
341 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
342 self
.assertEquals(rcode
, 400)
344 def testDOHEmptyPOST(self
):
346 DOH: Empty POST query
348 name
= 'empty-post.doh.tests.powerdns.com.'
350 (_
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
="", rawQuery
=True, response
=None, caFile
=self
._caCert
)
351 self
.assertEquals(receivedResponse
, None)
353 # and now a valid one
354 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
356 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
358 response
= dns
.message
.make_response(query
)
359 rrset
= dns
.rrset
.from_text(name
,
364 response
.answer
.append(rrset
)
365 (receivedQuery
, receivedResponse
) = self
.sendDOHPostQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
366 self
.assertTrue(receivedQuery
)
367 self
.assertTrue(receivedResponse
)
368 receivedQuery
.id = expectedQuery
.id
369 self
.assertEquals(expectedQuery
, receivedQuery
)
370 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
371 self
.assertEquals(response
, receivedResponse
)
373 def testHeaderRule(self
):
377 name
= 'header-rule.doh.tests.powerdns.com.'
378 query
= dns
.message
.make_query(name
, 'A', 'IN')
380 query
.flags
&= ~dns
.flags
.RD
381 expectedResponse
= dns
.message
.make_response(query
)
382 rrset
= dns
.rrset
.from_text(name
,
387 expectedResponse
.answer
.append(rrset
)
389 # this header should match
390 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False, customHeaders
=['x-powerdnS: aaaaa'])
391 self
.assertEquals(receivedResponse
, expectedResponse
)
393 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
394 expectedQuery
.flags
&= ~dns
.flags
.RD
396 response
= dns
.message
.make_response(query
)
397 rrset
= dns
.rrset
.from_text(name
,
402 response
.answer
.append(rrset
)
404 # this content of the header should NOT match
405 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
, customHeaders
=['x-powerdnS: bbbbb'])
406 self
.assertTrue(receivedQuery
)
407 self
.assertTrue(receivedResponse
)
408 receivedQuery
.id = expectedQuery
.id
409 self
.assertEquals(expectedQuery
, receivedQuery
)
410 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
411 self
.assertEquals(response
, receivedResponse
)
413 def testHTTPPath(self
):
417 name
= 'http-path.doh.tests.powerdns.com.'
418 query
= dns
.message
.make_query(name
, 'A', 'IN')
420 query
.flags
&= ~dns
.flags
.RD
421 expectedResponse
= dns
.message
.make_response(query
)
422 rrset
= dns
.rrset
.from_text(name
,
427 expectedResponse
.answer
.append(rrset
)
429 # this path should match
430 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ 'PowerDNS', caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
431 self
.assertEquals(receivedResponse
, expectedResponse
)
433 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
435 expectedQuery
.flags
&= ~dns
.flags
.RD
436 response
= dns
.message
.make_response(query
)
437 rrset
= dns
.rrset
.from_text(name
,
442 response
.answer
.append(rrset
)
444 # this path should NOT match
445 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
+ "PowerDNS2", query
, response
=response
, caFile
=self
._caCert
)
446 self
.assertTrue(receivedQuery
)
447 self
.assertTrue(receivedResponse
)
448 receivedQuery
.id = expectedQuery
.id
449 self
.assertEquals(expectedQuery
, receivedQuery
)
450 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
451 self
.assertEquals(response
, receivedResponse
)
453 class TestDOHAddingECS(DNSDistDOHTest
):
455 _serverKey
= 'server.key'
456 _serverCert
= 'server.chain'
457 _serverName
= 'tls.tests.dnsdist.org'
459 _dohServerPort
= 8443
460 _serverName
= 'tls.tests.dnsdist.org'
461 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
462 _config_template
= """
463 newServer{address="127.0.0.1:%s", useClientSubnet=true}
464 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
467 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
469 def testDOHSimple(self
):
471 DOH with ECS: Simple query
473 name
= 'simple.doh-ecs.tests.powerdns.com.'
474 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
476 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
477 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
478 response
= dns
.message
.make_response(query
)
479 rrset
= dns
.rrset
.from_text(name
,
484 response
.answer
.append(rrset
)
486 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
487 self
.assertTrue(receivedQuery
)
488 self
.assertTrue(receivedResponse
)
489 expectedQuery
.id = receivedQuery
.id
490 self
.assertEquals(expectedQuery
, receivedQuery
)
491 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
492 self
.assertEquals(response
, receivedResponse
)
493 self
.checkResponseNoEDNS(response
, receivedResponse
)
495 def testDOHExistingEDNS(self
):
497 DOH with ECS: Existing EDNS
499 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
500 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
502 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
503 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
504 response
= dns
.message
.make_response(query
)
505 rrset
= dns
.rrset
.from_text(name
,
510 response
.answer
.append(rrset
)
512 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
513 self
.assertTrue(receivedQuery
)
514 self
.assertTrue(receivedResponse
)
515 receivedQuery
.id = expectedQuery
.id
516 self
.assertEquals(expectedQuery
, receivedQuery
)
517 self
.assertEquals(response
, receivedResponse
)
518 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
519 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
521 def testDOHExistingECS(self
):
523 DOH with ECS: Existing EDNS Client Subnet
525 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
526 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
527 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
528 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
530 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
531 response
= dns
.message
.make_response(query
)
532 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
533 rrset
= dns
.rrset
.from_text(name
,
538 response
.answer
.append(rrset
)
540 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
541 self
.assertTrue(receivedQuery
)
542 self
.assertTrue(receivedResponse
)
543 receivedQuery
.id = expectedQuery
.id
544 self
.assertEquals(expectedQuery
, receivedQuery
)
545 self
.assertEquals(response
, receivedResponse
)
546 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
547 self
.checkResponseEDNSWithECS(response
, receivedResponse
)