]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
12 class DNSDistDOHTest(DNSDistTest
):
15 def getDOHGetURL(cls
, baseurl
, query
, rawQuery
=False):
19 wire
= query
.to_wire()
20 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
21 return baseurl
+ "?dns=" + param
24 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
26 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
28 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
29 "Accept: application/dns-message"])
33 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True, rawQuery
=False):
34 url
= cls
.getDOHGetURL(baseurl
, query
, rawQuery
)
35 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
36 #conn.setopt(pycurl.VERBOSE, True)
37 conn
.setopt(pycurl
.URL
, url
)
38 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
39 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
40 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
42 conn
.setopt(pycurl
.CAINFO
, caFile
)
45 cls
._toResponderQueue
.put(response
, True, timeout
)
49 data
= conn
.perform_rb()
50 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
52 message
= dns
.message
.from_wire(data
)
54 if useQueue
and not cls
._fromResponderQueue
.empty():
55 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
57 return (receivedQuery
, message
)
60 # def openDOHConnection(cls, port, caFile, timeout=2.0):
61 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
62 # sslctx.load_verify_locations(caFile)
63 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
66 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
67 # url = cls.getDOHGetURL(baseurl, query)
70 # cls._toResponderQueue.put(response, True, timeout)
72 # conn.request('GET', url)
75 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
77 # data = conn.get_response()
81 # message = dns.message.from_wire(data)
83 # if useQueue and not cls._fromResponderQueue.empty():
84 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
85 # return (receivedQuery, message)
89 class TestDOH(DNSDistDOHTest
):
91 _serverKey
= 'server.key'
92 _serverCert
= 'server.chain'
93 _serverName
= 'tls.tests.dnsdist.org'
96 _serverName
= 'tls.tests.dnsdist.org'
97 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
98 _config_template
= """
99 newServer{address="127.0.0.1:%s"}
100 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
102 addAction("drop.doh.tests.powerdns.com.", DropAction())
103 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
104 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
106 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
108 def testDOHSimple(self
):
112 name
= 'simple.doh.tests.powerdns.com.'
113 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
115 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
117 response
= dns
.message
.make_response(query
)
118 rrset
= dns
.rrset
.from_text(name
,
123 response
.answer
.append(rrset
)
125 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
126 self
.assertTrue(receivedQuery
)
127 self
.assertTrue(receivedResponse
)
128 receivedQuery
.id = expectedQuery
.id
129 self
.assertEquals(expectedQuery
, receivedQuery
)
130 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
131 self
.assertEquals(response
, receivedResponse
)
133 def testDOHExistingEDNS(self
):
137 name
= 'existing-edns.doh.tests.powerdns.com.'
138 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
140 response
= dns
.message
.make_response(query
)
141 rrset
= dns
.rrset
.from_text(name
,
146 response
.answer
.append(rrset
)
148 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
149 self
.assertTrue(receivedQuery
)
150 self
.assertTrue(receivedResponse
)
151 receivedQuery
.id = query
.id
152 self
.assertEquals(query
, receivedQuery
)
153 self
.assertEquals(response
, receivedResponse
)
154 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
155 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
157 def testDOHExistingECS(self
):
159 DOH: Existing EDNS Client Subnet
161 name
= 'existing-ecs.doh.tests.powerdns.com.'
162 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
163 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
164 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
166 response
= dns
.message
.make_response(query
)
167 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
168 rrset
= dns
.rrset
.from_text(name
,
173 response
.answer
.append(rrset
)
175 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
176 self
.assertTrue(receivedQuery
)
177 self
.assertTrue(receivedResponse
)
178 receivedQuery
.id = query
.id
179 self
.assertEquals(query
, receivedQuery
)
180 self
.assertEquals(response
, receivedResponse
)
181 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
182 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
184 def testDropped(self
):
188 name
= 'drop.doh.tests.powerdns.com.'
189 query
= dns
.message
.make_query(name
, 'A', 'IN')
190 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
191 print(receivedResponse
)
192 self
.assertEquals(receivedResponse
, None)
194 def testRefused(self
):
198 name
= 'refused.doh.tests.powerdns.com.'
199 query
= dns
.message
.make_query(name
, 'A', 'IN')
201 expectedResponse
= dns
.message
.make_response(query
)
202 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
204 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
205 self
.assertEquals(receivedResponse
, expectedResponse
)
211 name
= 'spoof.doh.tests.powerdns.com.'
212 query
= dns
.message
.make_query(name
, 'A', 'IN')
214 query
.flags
&= ~dns
.flags
.RD
215 expectedResponse
= dns
.message
.make_response(query
)
216 rrset
= dns
.rrset
.from_text(name
,
221 expectedResponse
.answer
.append(rrset
)
223 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
224 self
.assertEquals(receivedResponse
, expectedResponse
)
226 def testDOHInvalid(self
):
230 name
= 'invalid.doh.tests.powerdns.com.'
231 invalidQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
233 # first an invalid query
234 invalidQuery
= invalidQuery
.to_wire()
235 invalidQuery
= invalidQuery
[:-5]
236 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=invalidQuery
, response
=None, useQueue
=False, rawQuery
=True)
237 self
.assertEquals(receivedResponse
, None)
239 # and now a valid one
240 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
242 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
244 response
= dns
.message
.make_response(query
)
245 rrset
= dns
.rrset
.from_text(name
,
250 response
.answer
.append(rrset
)
251 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
252 self
.assertTrue(receivedQuery
)
253 self
.assertTrue(receivedResponse
)
254 receivedQuery
.id = expectedQuery
.id
255 self
.assertEquals(expectedQuery
, receivedQuery
)
256 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
257 self
.assertEquals(response
, receivedResponse
)
259 class TestDOHAddingECS(DNSDistDOHTest
):
261 _serverKey
= 'server.key'
262 _serverCert
= 'server.chain'
263 _serverName
= 'tls.tests.dnsdist.org'
265 _dohServerPort
= 8443
266 _serverName
= 'tls.tests.dnsdist.org'
267 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
268 _config_template
= """
269 newServer{address="127.0.0.1:%s", useClientSubnet=true}
270 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
273 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
275 def testDOHSimple(self
):
277 DOH with ECS: Simple query
279 name
= 'simple.doh-ecs.tests.powerdns.com.'
280 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
282 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
283 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
284 response
= dns
.message
.make_response(query
)
285 rrset
= dns
.rrset
.from_text(name
,
290 response
.answer
.append(rrset
)
292 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
293 self
.assertTrue(receivedQuery
)
294 self
.assertTrue(receivedResponse
)
295 expectedQuery
.id = receivedQuery
.id
296 self
.assertEquals(expectedQuery
, receivedQuery
)
297 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
298 self
.assertEquals(response
, receivedResponse
)
299 self
.checkResponseNoEDNS(response
, receivedResponse
)
301 def testDOHExistingEDNS(self
):
303 DOH with ECS: Existing EDNS
305 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
306 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
308 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
309 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
310 response
= dns
.message
.make_response(query
)
311 rrset
= dns
.rrset
.from_text(name
,
316 response
.answer
.append(rrset
)
318 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
319 self
.assertTrue(receivedQuery
)
320 self
.assertTrue(receivedResponse
)
321 receivedQuery
.id = expectedQuery
.id
322 self
.assertEquals(expectedQuery
, receivedQuery
)
323 self
.assertEquals(response
, receivedResponse
)
324 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
325 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
327 def testDOHExistingECS(self
):
329 DOH with ECS: Existing EDNS Client Subnet
331 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
332 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
333 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
334 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
336 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
337 response
= dns
.message
.make_response(query
)
338 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
339 rrset
= dns
.rrset
.from_text(name
,
344 response
.answer
.append(rrset
)
346 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
347 self
.assertTrue(receivedQuery
)
348 self
.assertTrue(receivedResponse
)
349 receivedQuery
.id = expectedQuery
.id
350 self
.assertEquals(expectedQuery
, receivedQuery
)
351 self
.assertEquals(response
, receivedResponse
)
352 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
353 self
.checkResponseEDNSWithECS(response
, receivedResponse
)