]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
4 import clientsubnetoption
5 from dnsdisttests
import DNSDistTest
9 #from hyper import HTTP20Connection
10 #from hyper.ssl_compat import SSLContext, PROTOCOL_TLSv1_2
12 class DNSDistDOHTest(DNSDistTest
):
15 def getDOHGetURL(cls
, baseurl
, query
):
16 wire
= query
.to_wire()
17 param
= base64
.urlsafe_b64encode(wire
).decode('UTF8').rstrip('=')
18 return baseurl
+ "?dns=" + param
21 def openDOHConnection(cls
, port
, caFile
, timeout
=2.0):
23 conn
.setopt(pycurl
.HTTP_VERSION
, pycurl
.CURL_HTTP_VERSION_2
)
25 conn
.setopt(pycurl
.HTTPHEADER
, ["Content-type: application/dns-message",
26 "Accept: application/dns-message"])
30 def sendDOHQuery(cls
, port
, servername
, baseurl
, query
, response
=None, timeout
=2.0, caFile
=None, useQueue
=True):
31 url
= cls
.getDOHGetURL(baseurl
, query
)
32 conn
= cls
.openDOHConnection(port
, caFile
=caFile
, timeout
=timeout
)
33 #conn.setopt(pycurl.VERBOSE, True)
34 conn
.setopt(pycurl
.URL
, url
)
35 conn
.setopt(pycurl
.RESOLVE
, ["%s:%d:127.0.0.1" % (servername
, port
)])
36 conn
.setopt(pycurl
.SSL_VERIFYPEER
, 1)
37 conn
.setopt(pycurl
.SSL_VERIFYHOST
, 2)
39 conn
.setopt(pycurl
.CAINFO
, caFile
)
42 cls
._toResponderQueue
.put(response
, True, timeout
)
46 data
= conn
.perform_rb()
47 rcode
= conn
.getinfo(pycurl
.RESPONSE_CODE
)
49 message
= dns
.message
.from_wire(data
)
51 if useQueue
and not cls
._fromResponderQueue
.empty():
52 receivedQuery
= cls
._fromResponderQueue
.get(True, timeout
)
54 return (receivedQuery
, message
)
57 # def openDOHConnection(cls, port, caFile, timeout=2.0):
58 # sslctx = SSLContext(PROTOCOL_TLSv1_2)
59 # sslctx.load_verify_locations(caFile)
60 # return HTTP20Connection('127.0.0.1', port=port, secure=True, timeout=timeout, ssl_context=sslctx, force_proto='h2')
63 # def sendDOHQueryOverConnection(cls, conn, baseurl, query, response=None, timeout=2.0):
64 # url = cls.getDOHGetURL(baseurl, query)
67 # cls._toResponderQueue.put(response, True, timeout)
69 # conn.request('GET', url)
72 # def recvDOHResponseOverConnection(cls, conn, useQueue=False, timeout=2.0):
74 # data = conn.get_response()
78 # message = dns.message.from_wire(data)
80 # if useQueue and not cls._fromResponderQueue.empty():
81 # receivedQuery = cls._fromResponderQueue.get(True, timeout)
82 # return (receivedQuery, message)
86 class TestDOH(DNSDistDOHTest
):
88 _serverKey
= 'server.key'
89 _serverCert
= 'server.chain'
90 _serverName
= 'tls.tests.dnsdist.org'
93 _serverName
= 'tls.tests.dnsdist.org'
94 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
95 _config_template
= """
96 newServer{address="127.0.0.1:%s"}
97 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
99 addAction("drop.doh.tests.powerdns.com.", DropAction())
100 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
101 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
103 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
105 def testDOHSimple(self
):
109 name
= 'simple.doh.tests.powerdns.com.'
110 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
112 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096)
114 response
= dns
.message
.make_response(query
)
115 rrset
= dns
.rrset
.from_text(name
,
120 response
.answer
.append(rrset
)
122 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
123 self
.assertTrue(receivedQuery
)
124 self
.assertTrue(receivedResponse
)
125 receivedQuery
.id = expectedQuery
.id
126 self
.assertEquals(expectedQuery
, receivedQuery
)
127 self
.checkQueryEDNSWithoutECS(expectedQuery
, receivedQuery
)
128 self
.assertEquals(response
, receivedResponse
)
130 def testDOHExistingEDNS(self
):
134 name
= 'existing-edns.doh.tests.powerdns.com.'
135 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
137 response
= dns
.message
.make_response(query
)
138 rrset
= dns
.rrset
.from_text(name
,
143 response
.answer
.append(rrset
)
145 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
146 self
.assertTrue(receivedQuery
)
147 self
.assertTrue(receivedResponse
)
148 receivedQuery
.id = query
.id
149 self
.assertEquals(query
, receivedQuery
)
150 self
.assertEquals(response
, receivedResponse
)
151 self
.checkQueryEDNSWithoutECS(query
, receivedQuery
)
152 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
154 def testDOHExistingECS(self
):
156 DOH: Existing EDNS Client Subnet
158 name
= 'existing-ecs.doh.tests.powerdns.com.'
159 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
160 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.1', 24)
161 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
163 response
= dns
.message
.make_response(query
)
164 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
165 rrset
= dns
.rrset
.from_text(name
,
170 response
.answer
.append(rrset
)
172 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
173 self
.assertTrue(receivedQuery
)
174 self
.assertTrue(receivedResponse
)
175 receivedQuery
.id = query
.id
176 self
.assertEquals(query
, receivedQuery
)
177 self
.assertEquals(response
, receivedResponse
)
178 self
.checkQueryEDNSWithECS(query
, receivedQuery
)
179 self
.checkResponseEDNSWithECS(response
, receivedResponse
)
181 def testDropped(self
):
185 name
= 'drop.doh.tests.powerdns.com.'
186 query
= dns
.message
.make_query(name
, 'A', 'IN')
187 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
188 print(receivedResponse
)
189 self
.assertEquals(receivedResponse
, None)
191 def testRefused(self
):
195 name
= 'refused.doh.tests.powerdns.com.'
196 query
= dns
.message
.make_query(name
, 'A', 'IN')
198 expectedResponse
= dns
.message
.make_response(query
)
199 expectedResponse
.set_rcode(dns
.rcode
.REFUSED
)
201 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
202 self
.assertEquals(receivedResponse
, expectedResponse
)
208 name
= 'spoof.doh.tests.powerdns.com.'
209 query
= dns
.message
.make_query(name
, 'A', 'IN')
211 query
.flags
&= ~dns
.flags
.RD
212 expectedResponse
= dns
.message
.make_response(query
)
213 rrset
= dns
.rrset
.from_text(name
,
218 expectedResponse
.answer
.append(rrset
)
220 (_
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, caFile
=self
._caCert
, query
=query
, response
=None, useQueue
=False)
221 self
.assertEquals(receivedResponse
, expectedResponse
)
224 class TestDOHAddingECS(DNSDistDOHTest
):
226 _serverKey
= 'server.key'
227 _serverCert
= 'server.chain'
228 _serverName
= 'tls.tests.dnsdist.org'
230 _dohServerPort
= 8443
231 _serverName
= 'tls.tests.dnsdist.org'
232 _dohBaseURL
= ("https://%s:%d/" % (_serverName
, _dohServerPort
))
233 _config_template
= """
234 newServer{address="127.0.0.1:%s", useClientSubnet=true}
235 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
238 _config_params
= ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
240 def testDOHSimple(self
):
242 DOH with ECS: Simple query
244 name
= 'simple.doh-ecs.tests.powerdns.com.'
245 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
247 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
248 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=4096, options
=[rewrittenEcso
])
249 response
= dns
.message
.make_response(query
)
250 rrset
= dns
.rrset
.from_text(name
,
255 response
.answer
.append(rrset
)
257 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
258 self
.assertTrue(receivedQuery
)
259 self
.assertTrue(receivedResponse
)
260 expectedQuery
.id = receivedQuery
.id
261 self
.assertEquals(expectedQuery
, receivedQuery
)
262 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
263 self
.assertEquals(response
, receivedResponse
)
264 self
.checkResponseNoEDNS(response
, receivedResponse
)
266 def testDOHExistingEDNS(self
):
268 DOH with ECS: Existing EDNS
270 name
= 'existing-edns.doh-ecs.tests.powerdns.com.'
271 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192)
273 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
274 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=8192, options
=[rewrittenEcso
])
275 response
= dns
.message
.make_response(query
)
276 rrset
= dns
.rrset
.from_text(name
,
281 response
.answer
.append(rrset
)
283 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
284 self
.assertTrue(receivedQuery
)
285 self
.assertTrue(receivedResponse
)
286 receivedQuery
.id = expectedQuery
.id
287 self
.assertEquals(expectedQuery
, receivedQuery
)
288 self
.assertEquals(response
, receivedResponse
)
289 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
290 self
.checkResponseEDNSWithoutECS(response
, receivedResponse
)
292 def testDOHExistingECS(self
):
294 DOH with ECS: Existing EDNS Client Subnet
296 name
= 'existing-ecs.doh-ecs.tests.powerdns.com.'
297 ecso
= clientsubnetoption
.ClientSubnetOption('1.2.3.4')
298 rewrittenEcso
= clientsubnetoption
.ClientSubnetOption('127.0.0.0', 24)
299 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[ecso
], want_dnssec
=True)
301 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=True, payload
=512, options
=[rewrittenEcso
])
302 response
= dns
.message
.make_response(query
)
303 response
.use_edns(edns
=True, payload
=4096, options
=[rewrittenEcso
])
304 rrset
= dns
.rrset
.from_text(name
,
309 response
.answer
.append(rrset
)
311 (receivedQuery
, receivedResponse
) = self
.sendDOHQuery(self
._dohServerPort
, self
._serverName
, self
._dohBaseURL
, query
, response
=response
, caFile
=self
._caCert
)
312 self
.assertTrue(receivedQuery
)
313 self
.assertTrue(receivedResponse
)
314 receivedQuery
.id = expectedQuery
.id
315 self
.assertEquals(expectedQuery
, receivedQuery
)
316 self
.assertEquals(response
, receivedResponse
)
317 self
.checkQueryEDNSWithECS(expectedQuery
, receivedQuery
)
318 self
.checkResponseEDNSWithECS(response
, receivedResponse
)