]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOH.py
Merge pull request #8945 from rgacogne/ddist-x-forwarded-for
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import os
5 import re
6 import time
7 import unittest
8 import clientsubnetoption
9 from dnsdisttests import DNSDistTest
10
11 import pycurl
12 from io import BytesIO
13
14 @unittest.skipIf('SKIP_DOH_TESTS' in os.environ, 'DNS over HTTPS tests are disabled')
15 class DNSDistDOHTest(DNSDistTest):
16
17 @classmethod
18 def getDOHGetURL(cls, baseurl, query, rawQuery=False):
19 if rawQuery:
20 wire = query
21 else:
22 wire = query.to_wire()
23 param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
24 return baseurl + "?dns=" + param
25
26 @classmethod
27 def openDOHConnection(cls, port, caFile, timeout=2.0):
28 conn = pycurl.Curl()
29 conn.setopt(pycurl.HTTP_VERSION, pycurl.CURL_HTTP_VERSION_2)
30
31 conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
32 "Accept: application/dns-message"])
33 return conn
34
35 @classmethod
36 def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[], useHTTPS=True):
37 url = cls.getDOHGetURL(baseurl, query, rawQuery)
38 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
39 response_headers = BytesIO()
40 #conn.setopt(pycurl.VERBOSE, True)
41 conn.setopt(pycurl.URL, url)
42 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
43 if useHTTPS:
44 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
45 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
46 if caFile:
47 conn.setopt(pycurl.CAINFO, caFile)
48
49 conn.setopt(pycurl.HTTPHEADER, customHeaders)
50 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
51
52 if response:
53 cls._toResponderQueue.put(response, True, timeout)
54
55 receivedQuery = None
56 message = None
57 cls._response_headers = ''
58 data = conn.perform_rb()
59 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
60 if cls._rcode == 200 and not rawResponse:
61 message = dns.message.from_wire(data)
62 elif rawResponse:
63 message = data
64
65 if useQueue and not cls._fromResponderQueue.empty():
66 receivedQuery = cls._fromResponderQueue.get(True, timeout)
67
68 cls._response_headers = response_headers.getvalue()
69 return (receivedQuery, message)
70
71 @classmethod
72 def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, rawResponse=False, customHeaders=[], useHTTPS=True):
73 url = baseurl
74 conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
75 response_headers = BytesIO()
76 #conn.setopt(pycurl.VERBOSE, True)
77 conn.setopt(pycurl.URL, url)
78 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
79 if useHTTPS:
80 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
81 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
82 if caFile:
83 conn.setopt(pycurl.CAINFO, caFile)
84
85 conn.setopt(pycurl.HTTPHEADER, customHeaders)
86 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
87 conn.setopt(pycurl.POST, True)
88 data = query
89 if not rawQuery:
90 data = data.to_wire()
91
92 conn.setopt(pycurl.POSTFIELDS, data)
93
94 if response:
95 cls._toResponderQueue.put(response, True, timeout)
96
97 receivedQuery = None
98 message = None
99 cls._response_headers = ''
100 data = conn.perform_rb()
101 cls._rcode = conn.getinfo(pycurl.RESPONSE_CODE)
102 if cls._rcode == 200 and not rawResponse:
103 message = dns.message.from_wire(data)
104 elif rawResponse:
105 message = data
106
107 if useQueue and not cls._fromResponderQueue.empty():
108 receivedQuery = cls._fromResponderQueue.get(True, timeout)
109
110 cls._response_headers = response_headers.getvalue()
111 return (receivedQuery, message)
112
113 def getHeaderValue(self, name):
114 for header in self._response_headers.decode().splitlines(False):
115 values = header.split(':')
116 key = values[0]
117 if key.lower() == name.lower():
118 return values[1].strip()
119 return None
120
121 def checkHasHeader(self, name, value):
122 got = self.getHeaderValue(name)
123 self.assertEquals(got, value)
124
125 def checkNoHeader(self, name):
126 self.checkHasHeader(name, None)
127
128 @classmethod
129 def setUpClass(cls):
130
131 # for some reason, @unittest.skipIf() is not applied to derived classes with some versions of Python
132 if 'SKIP_DOH_TESTS' in os.environ:
133 raise unittest.SkipTest('DNS over HTTPS tests are disabled')
134
135 cls.startResponders()
136 cls.startDNSDist()
137 cls.setUpSockets()
138
139 print("Launching tests..")
140
141 class TestDOH(DNSDistDOHTest):
142
143 _serverKey = 'server.key'
144 _serverCert = 'server.chain'
145 _serverName = 'tls.tests.dnsdist.org'
146 _caCert = 'ca.pem'
147 _dohServerPort = 8443
148 _customResponseHeader1 = 'access-control-allow-origin: *'
149 _customResponseHeader2 = 'user-agent: derp'
150 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
151 _config_template = """
152 newServer{address="127.0.0.1:%s"}
153
154 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/", "/coffee", "/PowerDNS", "/PowerDNS2", "/PowerDNS-999" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp",["UPPERCASE"]="VaLuE"}})
155 dohFE = getDOHFrontend(0)
156 dohFE:setResponsesMap({newDOHResponseMapEntry('^/coffee$', 418, 'C0FFEE', {['FoO']='bar'})})
157
158 addAction("drop.doh.tests.powerdns.com.", DropAction())
159 addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
160 addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
161 addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
162 addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
163 addAction(HTTPPathRegexRule("^/PowerDNS-[0-9]"), SpoofAction("6.7.8.9"))
164 addAction("http-status-action.doh.tests.powerdns.com.", HTTPStatusAction(200, "Plaintext answer", "text/plain"))
165 addAction("http-status-action-redirect.doh.tests.powerdns.com.", HTTPStatusAction(307, "https://doh.powerdns.org"))
166
167 function dohHandler(dq)
168 if dq:getHTTPScheme() == 'https' and dq:getHTTPHost() == '%s:%d' and dq:getHTTPPath() == '/' and dq:getHTTPQueryString() == '' then
169 local foundct = false
170 for key,value in pairs(dq:getHTTPHeaders()) do
171 if key == 'content-type' and value == 'application/dns-message' then
172 foundct = true
173 break
174 end
175 end
176 if foundct then
177 dq:setHTTPResponse(200, 'It works!', 'text/plain')
178 dq.dh:setQR(true)
179 return DNSAction.HeaderModify
180 end
181 end
182 return DNSAction.None
183 end
184 addAction("http-lua.doh.tests.powerdns.com.", LuaAction(dohHandler))
185 """
186 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
187
188 def testDOHSimple(self):
189 """
190 DOH: Simple query
191 """
192 name = 'simple.doh.tests.powerdns.com.'
193 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
194 query.id = 0
195 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
196 expectedQuery.id = 0
197 response = dns.message.make_response(query)
198 rrset = dns.rrset.from_text(name,
199 3600,
200 dns.rdataclass.IN,
201 dns.rdatatype.A,
202 '127.0.0.1')
203 response.answer.append(rrset)
204
205 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
206 self.assertTrue(receivedQuery)
207 self.assertTrue(receivedResponse)
208 receivedQuery.id = expectedQuery.id
209 self.assertEquals(expectedQuery, receivedQuery)
210 self.assertTrue((self._customResponseHeader1) in self._response_headers.decode())
211 self.assertTrue((self._customResponseHeader2) in self._response_headers.decode())
212 self.assertFalse(('UPPERCASE: VaLuE' in self._response_headers.decode()))
213 self.assertTrue(('uppercase: VaLuE' in self._response_headers.decode()))
214 self.assertTrue(('cache-control: max-age=3600' in self._response_headers.decode()))
215 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
216 self.assertEquals(response, receivedResponse)
217 self.checkHasHeader('cache-control', 'max-age=3600')
218
219 def testDOHSimplePOST(self):
220 """
221 DOH: Simple POST query
222 """
223 name = 'simple-post.doh.tests.powerdns.com.'
224 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
225 query.id = 0
226 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
227 expectedQuery.id = 0
228 response = dns.message.make_response(query)
229 rrset = dns.rrset.from_text(name,
230 3600,
231 dns.rdataclass.IN,
232 dns.rdatatype.A,
233 '127.0.0.1')
234 response.answer.append(rrset)
235
236 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
237 self.assertTrue(receivedQuery)
238 self.assertTrue(receivedResponse)
239 receivedQuery.id = expectedQuery.id
240 self.assertEquals(expectedQuery, receivedQuery)
241 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
242 self.assertEquals(response, receivedResponse)
243
244 def testDOHExistingEDNS(self):
245 """
246 DOH: Existing EDNS
247 """
248 name = 'existing-edns.doh.tests.powerdns.com.'
249 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
250 query.id = 0
251 response = dns.message.make_response(query)
252 rrset = dns.rrset.from_text(name,
253 3600,
254 dns.rdataclass.IN,
255 dns.rdatatype.A,
256 '127.0.0.1')
257 response.answer.append(rrset)
258
259 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
260 self.assertTrue(receivedQuery)
261 self.assertTrue(receivedResponse)
262 receivedQuery.id = query.id
263 self.assertEquals(query, receivedQuery)
264 self.assertEquals(response, receivedResponse)
265 self.checkQueryEDNSWithoutECS(query, receivedQuery)
266 self.checkResponseEDNSWithoutECS(response, receivedResponse)
267
268 def testDOHExistingECS(self):
269 """
270 DOH: Existing EDNS Client Subnet
271 """
272 name = 'existing-ecs.doh.tests.powerdns.com.'
273 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
274 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
275 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
276 query.id = 0
277 response = dns.message.make_response(query)
278 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
279 rrset = dns.rrset.from_text(name,
280 3600,
281 dns.rdataclass.IN,
282 dns.rdatatype.A,
283 '127.0.0.1')
284 response.answer.append(rrset)
285
286 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
287 self.assertTrue(receivedQuery)
288 self.assertTrue(receivedResponse)
289 receivedQuery.id = query.id
290 self.assertEquals(query, receivedQuery)
291 self.assertEquals(response, receivedResponse)
292 self.checkQueryEDNSWithECS(query, receivedQuery)
293 self.checkResponseEDNSWithECS(response, receivedResponse)
294
295 def testDropped(self):
296 """
297 DOH: Dropped query
298 """
299 name = 'drop.doh.tests.powerdns.com.'
300 query = dns.message.make_query(name, 'A', 'IN')
301 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
302 self.assertEquals(receivedResponse, None)
303
304 def testRefused(self):
305 """
306 DOH: Refused
307 """
308 name = 'refused.doh.tests.powerdns.com.'
309 query = dns.message.make_query(name, 'A', 'IN')
310 query.id = 0
311 query.flags &= ~dns.flags.RD
312 expectedResponse = dns.message.make_response(query)
313 expectedResponse.set_rcode(dns.rcode.REFUSED)
314
315 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
316 self.assertEquals(receivedResponse, expectedResponse)
317
318 def testSpoof(self):
319 """
320 DOH: Spoofed
321 """
322 name = 'spoof.doh.tests.powerdns.com.'
323 query = dns.message.make_query(name, 'A', 'IN')
324 query.id = 0
325 query.flags &= ~dns.flags.RD
326 expectedResponse = dns.message.make_response(query)
327 rrset = dns.rrset.from_text(name,
328 3600,
329 dns.rdataclass.IN,
330 dns.rdatatype.A,
331 '1.2.3.4')
332 expectedResponse.answer.append(rrset)
333
334 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
335 self.assertEquals(receivedResponse, expectedResponse)
336
337 def testDOHInvalid(self):
338 """
339 DOH: Invalid query
340 """
341 name = 'invalid.doh.tests.powerdns.com.'
342 invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
343 invalidQuery.id = 0
344 # first an invalid query
345 invalidQuery = invalidQuery.to_wire()
346 invalidQuery = invalidQuery[:-5]
347 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
348 self.assertEquals(receivedResponse, None)
349
350 # and now a valid one
351 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
352 query.id = 0
353 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
354 expectedQuery.id = 0
355 response = dns.message.make_response(query)
356 rrset = dns.rrset.from_text(name,
357 3600,
358 dns.rdataclass.IN,
359 dns.rdatatype.A,
360 '127.0.0.1')
361 response.answer.append(rrset)
362 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
363 self.assertTrue(receivedQuery)
364 self.assertTrue(receivedResponse)
365 receivedQuery.id = expectedQuery.id
366 self.assertEquals(expectedQuery, receivedQuery)
367 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
368 self.assertEquals(response, receivedResponse)
369
370 def testDOHWithoutQuery(self):
371 """
372 DOH: Empty GET query
373 """
374 name = 'empty-get.doh.tests.powerdns.com.'
375 url = self._dohBaseURL
376 conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
377 conn.setopt(pycurl.URL, url)
378 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
379 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
380 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
381 conn.setopt(pycurl.CAINFO, self._caCert)
382 data = conn.perform_rb()
383 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
384 self.assertEquals(rcode, 400)
385
386 def testDOHEmptyPOST(self):
387 """
388 DOH: Empty POST query
389 """
390 name = 'empty-post.doh.tests.powerdns.com.'
391
392 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
393 self.assertEquals(receivedResponse, None)
394
395 # and now a valid one
396 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
397 query.id = 0
398 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
399 expectedQuery.id = 0
400 response = dns.message.make_response(query)
401 rrset = dns.rrset.from_text(name,
402 3600,
403 dns.rdataclass.IN,
404 dns.rdatatype.A,
405 '127.0.0.1')
406 response.answer.append(rrset)
407 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
408 self.assertTrue(receivedQuery)
409 self.assertTrue(receivedResponse)
410 receivedQuery.id = expectedQuery.id
411 self.assertEquals(expectedQuery, receivedQuery)
412 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
413 self.assertEquals(response, receivedResponse)
414
415 def testHeaderRule(self):
416 """
417 DOH: HeaderRule
418 """
419 name = 'header-rule.doh.tests.powerdns.com.'
420 query = dns.message.make_query(name, 'A', 'IN')
421 query.id = 0
422 query.flags &= ~dns.flags.RD
423 expectedResponse = dns.message.make_response(query)
424 rrset = dns.rrset.from_text(name,
425 3600,
426 dns.rdataclass.IN,
427 dns.rdatatype.A,
428 '2.3.4.5')
429 expectedResponse.answer.append(rrset)
430
431 # this header should match
432 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False, customHeaders=['x-powerdnS: aaaaa'])
433 self.assertEquals(receivedResponse, expectedResponse)
434
435 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
436 expectedQuery.flags &= ~dns.flags.RD
437 expectedQuery.id = 0
438 response = dns.message.make_response(query)
439 rrset = dns.rrset.from_text(name,
440 3600,
441 dns.rdataclass.IN,
442 dns.rdatatype.A,
443 '127.0.0.1')
444 response.answer.append(rrset)
445
446 # this content of the header should NOT match
447 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-powerdnS: bbbbb'])
448 self.assertTrue(receivedQuery)
449 self.assertTrue(receivedResponse)
450 receivedQuery.id = expectedQuery.id
451 self.assertEquals(expectedQuery, receivedQuery)
452 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
453 self.assertEquals(response, receivedResponse)
454
455 def testHTTPPath(self):
456 """
457 DOH: HTTPPath
458 """
459 name = 'http-path.doh.tests.powerdns.com.'
460 query = dns.message.make_query(name, 'A', 'IN')
461 query.id = 0
462 query.flags &= ~dns.flags.RD
463 expectedResponse = dns.message.make_response(query)
464 rrset = dns.rrset.from_text(name,
465 3600,
466 dns.rdataclass.IN,
467 dns.rdatatype.A,
468 '3.4.5.6')
469 expectedResponse.answer.append(rrset)
470
471 # this path should match
472 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS', caFile=self._caCert, query=query, response=None, useQueue=False)
473 self.assertEquals(receivedResponse, expectedResponse)
474
475 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
476 expectedQuery.id = 0
477 expectedQuery.flags &= ~dns.flags.RD
478 response = dns.message.make_response(query)
479 rrset = dns.rrset.from_text(name,
480 3600,
481 dns.rdataclass.IN,
482 dns.rdatatype.A,
483 '127.0.0.1')
484 response.answer.append(rrset)
485
486 # this path should NOT match
487 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
488 self.assertTrue(receivedQuery)
489 self.assertTrue(receivedResponse)
490 receivedQuery.id = expectedQuery.id
491 self.assertEquals(expectedQuery, receivedQuery)
492 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
493 self.assertEquals(response, receivedResponse)
494
495 # this path is not in the URLs map and should lead to a 404
496 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS/something", query, caFile=self._caCert, useQueue=False, rawResponse=True)
497 self.assertTrue(receivedResponse)
498 self.assertEquals(receivedResponse, b'there is no endpoint configured for this path')
499 self.assertEquals(self._rcode, 404)
500
501 def testHTTPPathRegex(self):
502 """
503 DOH: HTTPPathRegex
504 """
505 name = 'http-path-regex.doh.tests.powerdns.com.'
506 query = dns.message.make_query(name, 'A', 'IN')
507 query.id = 0
508 query.flags &= ~dns.flags.RD
509 expectedResponse = dns.message.make_response(query)
510 rrset = dns.rrset.from_text(name,
511 3600,
512 dns.rdataclass.IN,
513 dns.rdatatype.A,
514 '6.7.8.9')
515 expectedResponse.answer.append(rrset)
516
517 # this path should match
518 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS-999', caFile=self._caCert, query=query, response=None, useQueue=False)
519 self.assertEquals(receivedResponse, expectedResponse)
520
521 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
522 expectedQuery.id = 0
523 expectedQuery.flags &= ~dns.flags.RD
524 response = dns.message.make_response(query)
525 rrset = dns.rrset.from_text(name,
526 3600,
527 dns.rdataclass.IN,
528 dns.rdatatype.A,
529 '127.0.0.1')
530 response.answer.append(rrset)
531
532 # this path should NOT match
533 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
534 self.assertTrue(receivedQuery)
535 self.assertTrue(receivedResponse)
536 receivedQuery.id = expectedQuery.id
537 self.assertEquals(expectedQuery, receivedQuery)
538 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
539 self.assertEquals(response, receivedResponse)
540
541 def testHTTPStatusAction200(self):
542 """
543 DOH: HTTPStatusAction 200 OK
544 """
545 name = 'http-status-action.doh.tests.powerdns.com.'
546 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
547 query.id = 0
548
549 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
550 self.assertTrue(receivedResponse)
551 self.assertEquals(receivedResponse, b'Plaintext answer')
552 self.assertEquals(self._rcode, 200)
553 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
554
555 def testHTTPStatusAction307(self):
556 """
557 DOH: HTTPStatusAction 307
558 """
559 name = 'http-status-action-redirect.doh.tests.powerdns.com.'
560 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
561 query.id = 0
562
563 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
564 self.assertTrue(receivedResponse)
565 self.assertEquals(self._rcode, 307)
566 self.assertTrue('location: https://doh.powerdns.org' in self._response_headers.decode())
567
568 def testHTTPLuaResponse(self):
569 """
570 DOH: Lua HTTP Response
571 """
572 name = 'http-lua.doh.tests.powerdns.com.'
573 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
574 query.id = 0
575
576 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
577 self.assertTrue(receivedResponse)
578 self.assertEquals(receivedResponse, b'It works!')
579 self.assertEquals(self._rcode, 200)
580 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
581
582 def testHTTPEarlyResponse(self):
583 """
584 DOH: HTTP Early Response
585 """
586 response_headers = BytesIO()
587 url = self._dohBaseURL + 'coffee'
588 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
589 conn.setopt(pycurl.URL, url)
590 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
591 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
592 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
593 conn.setopt(pycurl.CAINFO, self._caCert)
594 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
595 data = conn.perform_rb()
596 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
597 headers = response_headers.getvalue().decode()
598
599 self.assertEquals(rcode, 418)
600 self.assertEquals(data, b'C0FFEE')
601 self.assertIn('foo: bar', headers)
602 self.assertNotIn(self._customResponseHeader2, headers)
603
604 response_headers = BytesIO()
605 conn = self.openDOHConnection(self._dohServerPort, caFile=self._caCert, timeout=2.0)
606 conn.setopt(pycurl.URL, url)
607 conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
608 conn.setopt(pycurl.SSL_VERIFYPEER, 1)
609 conn.setopt(pycurl.SSL_VERIFYHOST, 2)
610 conn.setopt(pycurl.CAINFO, self._caCert)
611 conn.setopt(pycurl.HEADERFUNCTION, response_headers.write)
612 conn.setopt(pycurl.POST, True)
613 data = ''
614 conn.setopt(pycurl.POSTFIELDS, data)
615
616 data = conn.perform_rb()
617 rcode = conn.getinfo(pycurl.RESPONSE_CODE)
618 headers = response_headers.getvalue().decode()
619 self.assertEquals(rcode, 418)
620 self.assertEquals(data, b'C0FFEE')
621 self.assertIn('foo: bar', headers)
622 self.assertNotIn(self._customResponseHeader2, headers)
623
624 class TestDOHAddingECS(DNSDistDOHTest):
625
626 _serverKey = 'server.key'
627 _serverCert = 'server.chain'
628 _serverName = 'tls.tests.dnsdist.org'
629 _caCert = 'ca.pem'
630 _dohServerPort = 8443
631 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
632 _config_template = """
633 newServer{address="127.0.0.1:%s", useClientSubnet=true}
634 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
635 setECSOverride(true)
636 """
637 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
638
639 def testDOHSimple(self):
640 """
641 DOH with ECS: Simple query
642 """
643 name = 'simple.doh-ecs.tests.powerdns.com.'
644 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
645 query.id = 0
646 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
647 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[rewrittenEcso])
648 response = dns.message.make_response(query)
649 rrset = dns.rrset.from_text(name,
650 3600,
651 dns.rdataclass.IN,
652 dns.rdatatype.A,
653 '127.0.0.1')
654 response.answer.append(rrset)
655
656 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
657 self.assertTrue(receivedQuery)
658 self.assertTrue(receivedResponse)
659 expectedQuery.id = receivedQuery.id
660 self.assertEquals(expectedQuery, receivedQuery)
661 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
662 self.assertEquals(response, receivedResponse)
663 self.checkResponseNoEDNS(response, receivedResponse)
664
665 def testDOHExistingEDNS(self):
666 """
667 DOH with ECS: Existing EDNS
668 """
669 name = 'existing-edns.doh-ecs.tests.powerdns.com.'
670 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192)
671 query.id = 0
672 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
673 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=8192, options=[rewrittenEcso])
674 response = dns.message.make_response(query)
675 rrset = dns.rrset.from_text(name,
676 3600,
677 dns.rdataclass.IN,
678 dns.rdatatype.A,
679 '127.0.0.1')
680 response.answer.append(rrset)
681
682 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
683 self.assertTrue(receivedQuery)
684 self.assertTrue(receivedResponse)
685 receivedQuery.id = expectedQuery.id
686 self.assertEquals(expectedQuery, receivedQuery)
687 self.assertEquals(response, receivedResponse)
688 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
689 self.checkResponseEDNSWithoutECS(response, receivedResponse)
690
691 def testDOHExistingECS(self):
692 """
693 DOH with ECS: Existing EDNS Client Subnet
694 """
695 name = 'existing-ecs.doh-ecs.tests.powerdns.com.'
696 ecso = clientsubnetoption.ClientSubnetOption('1.2.3.4')
697 rewrittenEcso = clientsubnetoption.ClientSubnetOption('127.0.0.0', 24)
698 query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[ecso], want_dnssec=True)
699 query.id = 0
700 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[rewrittenEcso])
701 response = dns.message.make_response(query)
702 response.use_edns(edns=True, payload=4096, options=[rewrittenEcso])
703 rrset = dns.rrset.from_text(name,
704 3600,
705 dns.rdataclass.IN,
706 dns.rdatatype.A,
707 '127.0.0.1')
708 response.answer.append(rrset)
709
710 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
711 self.assertTrue(receivedQuery)
712 self.assertTrue(receivedResponse)
713 receivedQuery.id = expectedQuery.id
714 self.assertEquals(expectedQuery, receivedQuery)
715 self.assertEquals(response, receivedResponse)
716 self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
717 self.checkResponseEDNSWithECS(response, receivedResponse)
718
719 class TestDOHOverHTTP(DNSDistDOHTest):
720
721 _dohServerPort = 8480
722 _serverName = 'tls.tests.dnsdist.org'
723 _dohBaseURL = ("http://%s:%d/dns-query" % (_serverName, _dohServerPort))
724 _config_template = """
725 newServer{address="127.0.0.1:%s"}
726 addDOHLocal("127.0.0.1:%s")
727 """
728 _config_params = ['_testServerPort', '_dohServerPort']
729 _checkConfigExpectedOutput = b"""No certificate provided for DoH endpoint 127.0.0.1:8480, running in DNS over HTTP mode instead of DNS over HTTPS
730 Configuration 'configs/dnsdist_TestDOHOverHTTP.conf' OK!
731 """
732
733 def testDOHSimple(self):
734 """
735 DOH over HTTP: Simple query
736 """
737 name = 'simple.doh-over-http.tests.powerdns.com.'
738 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
739 query.id = 0
740 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
741 response = dns.message.make_response(query)
742 rrset = dns.rrset.from_text(name,
743 3600,
744 dns.rdataclass.IN,
745 dns.rdatatype.A,
746 '127.0.0.1')
747 response.answer.append(rrset)
748
749 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, useHTTPS=False)
750 self.assertTrue(receivedQuery)
751 self.assertTrue(receivedResponse)
752 expectedQuery.id = receivedQuery.id
753 self.assertEquals(expectedQuery, receivedQuery)
754 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
755 self.assertEquals(response, receivedResponse)
756 self.checkResponseNoEDNS(response, receivedResponse)
757
758 def testDOHSimplePOST(self):
759 """
760 DOH over HTTP: Simple POST query
761 """
762 name = 'simple-post.doh-over-http.tests.powerdns.com.'
763 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
764 query.id = 0
765 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
766 expectedQuery.id = 0
767 response = dns.message.make_response(query)
768 rrset = dns.rrset.from_text(name,
769 3600,
770 dns.rdataclass.IN,
771 dns.rdatatype.A,
772 '127.0.0.1')
773 response.answer.append(rrset)
774
775 (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, useHTTPS=False)
776 self.assertTrue(receivedQuery)
777 self.assertTrue(receivedResponse)
778 receivedQuery.id = expectedQuery.id
779 self.assertEquals(expectedQuery, receivedQuery)
780 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
781 self.assertEquals(response, receivedResponse)
782 self.checkResponseNoEDNS(response, receivedResponse)
783
784 class TestDOHWithCache(DNSDistDOHTest):
785
786 _serverKey = 'server.key'
787 _serverCert = 'server.chain'
788 _serverName = 'tls.tests.dnsdist.org'
789 _caCert = 'ca.pem'
790 _dohServerPort = 8443
791 _dohBaseURL = ("https://%s:%d/dns-query" % (_serverName, _dohServerPort))
792 _config_template = """
793 newServer{address="127.0.0.1:%s"}
794
795 addDOHLocal("127.0.0.1:%s", "%s", "%s")
796
797 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
798 getPool(""):setCache(pc)
799 """
800 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
801
802 def testDOHCacheLargeAnswer(self):
803 """
804 DOH with cache: Check that we can cache (and retrieve) large answers
805 """
806 numberOfQueries = 10
807 name = 'large.doh-with-cache.tests.powerdns.com.'
808 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
809 query.id = 0
810 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
811 expectedQuery.id = 0
812 response = dns.message.make_response(query)
813 # we prepare a large answer
814 content = ""
815 for i in range(44):
816 if len(content) > 0:
817 content = content + ', '
818 content = content + (str(i)*50)
819 # pad up to 4096
820 content = content + 'A'*40
821
822 rrset = dns.rrset.from_text(name,
823 3600,
824 dns.rdataclass.IN,
825 dns.rdatatype.TXT,
826 content)
827 response.answer.append(rrset)
828 self.assertEquals(len(response.to_wire()), 4096)
829
830 # first query to fill the cache
831 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
832 self.assertTrue(receivedQuery)
833 self.assertTrue(receivedResponse)
834 receivedQuery.id = expectedQuery.id
835 self.assertEquals(expectedQuery, receivedQuery)
836 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
837 self.assertEquals(response, receivedResponse)
838 self.checkHasHeader('cache-control', 'max-age=3600')
839
840 for _ in range(numberOfQueries):
841 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False)
842 self.assertEquals(receivedResponse, response)
843 self.checkHasHeader('cache-control', 'max-age=' + str(receivedResponse.answer[0].ttl))
844
845 time.sleep(1)
846
847 (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False)
848 self.assertEquals(receivedResponse, response)
849 self.checkHasHeader('cache-control', 'max-age=' + str(receivedResponse.answer[0].ttl))
850
851 class TestDOHWithoutCacheControl(DNSDistDOHTest):
852
853 _serverKey = 'server.key'
854 _serverCert = 'server.chain'
855 _serverName = 'tls.tests.dnsdist.org'
856 _caCert = 'ca.pem'
857 _dohServerPort = 8443
858 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
859 _config_template = """
860 newServer{address="127.0.0.1:%s"}
861
862 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {sendCacheControlHeaders=false})
863 """
864 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
865
866 def testDOHSimple(self):
867 """
868 DOH without cache-control
869 """
870 name = 'simple.doh.tests.powerdns.com.'
871 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
872 query.id = 0
873 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
874 expectedQuery.id = 0
875 response = dns.message.make_response(query)
876 rrset = dns.rrset.from_text(name,
877 3600,
878 dns.rdataclass.IN,
879 dns.rdatatype.A,
880 '127.0.0.1')
881 response.answer.append(rrset)
882
883 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
884 self.assertTrue(receivedQuery)
885 self.assertTrue(receivedResponse)
886 receivedQuery.id = expectedQuery.id
887 self.assertEquals(expectedQuery, receivedQuery)
888 self.checkNoHeader('cache-control')
889 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
890 self.assertEquals(response, receivedResponse)
891
892 class TestDOHFFI(DNSDistDOHTest):
893
894 _serverKey = 'server.key'
895 _serverCert = 'server.chain'
896 _serverName = 'tls.tests.dnsdist.org'
897 _caCert = 'ca.pem'
898 _dohServerPort = 8443
899 _customResponseHeader1 = 'access-control-allow-origin: *'
900 _customResponseHeader2 = 'user-agent: derp'
901 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
902 _config_template = """
903 newServer{address="127.0.0.1:%s"}
904
905 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {customResponseHeaders={["access-control-allow-origin"]="*",["user-agent"]="derp",["UPPERCASE"]="VaLuE"}})
906
907 local ffi = require("ffi")
908
909 function dohHandler(dq)
910 local scheme = ffi.string(ffi.C.dnsdist_ffi_dnsquestion_get_http_scheme(dq))
911 local host = ffi.string(ffi.C.dnsdist_ffi_dnsquestion_get_http_host(dq))
912 local path = ffi.string(ffi.C.dnsdist_ffi_dnsquestion_get_http_path(dq))
913 local query_string = ffi.string(ffi.C.dnsdist_ffi_dnsquestion_get_http_query_string(dq))
914 if scheme == 'https' and host == '%s:%d' and path == '/' and query_string == '' then
915 local foundct = false
916 local headers_ptr = ffi.new("const dnsdist_ffi_http_header_t *[1]")
917 local headers_ptr_param = ffi.cast("const dnsdist_ffi_http_header_t **", headers_ptr)
918
919 local headers_count = tonumber(ffi.C.dnsdist_ffi_dnsquestion_get_http_headers(dq, headers_ptr_param))
920 if headers_count > 0 then
921 for idx = 0, headers_count-1 do
922 if ffi.string(headers_ptr[0][idx].name) == 'content-type' and ffi.string(headers_ptr[0][idx].value) == 'application/dns-message' then
923 foundct = true
924 break
925 end
926 end
927 end
928 if foundct then
929 ffi.C.dnsdist_ffi_dnsquestion_set_http_response(dq, 200, 'It works!', 'text/plain')
930 return DNSAction.HeaderModify
931 end
932 end
933 return DNSAction.None
934 end
935 addAction("http-lua-ffi.doh.tests.powerdns.com.", LuaFFIAction(dohHandler))
936 """
937 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_serverName', '_dohServerPort']
938
939 def testHTTPLuaFFIResponse(self):
940 """
941 DOH: Lua FFI HTTP Response
942 """
943 name = 'http-lua-ffi.doh.tests.powerdns.com.'
944 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
945 query.id = 0
946
947 (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, useQueue=False, rawResponse=True)
948 self.assertTrue(receivedResponse)
949 self.assertEquals(receivedResponse, b'It works!')
950 self.assertEquals(self._rcode, 200)
951 self.assertTrue('content-type: text/plain' in self._response_headers.decode())
952
953 class TestDOHForwardedFor(DNSDistDOHTest):
954
955 _serverKey = 'server.key'
956 _serverCert = 'server.chain'
957 _serverName = 'tls.tests.dnsdist.org'
958 _caCert = 'ca.pem'
959 _dohServerPort = 8443
960 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
961 _config_template = """
962 newServer{address="127.0.0.1:%s"}
963
964 setACL('192.0.2.1/32')
965 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, {trustForwardedForHeader=true})
966 """
967 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
968
969 def testDOHAllowedForwarded(self):
970 """
971 DOH with X-Forwarded-For allowed
972 """
973 name = 'allowed.forwarded.doh.tests.powerdns.com.'
974 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
975 query.id = 0
976 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
977 expectedQuery.id = 0
978 response = dns.message.make_response(query)
979 rrset = dns.rrset.from_text(name,
980 3600,
981 dns.rdataclass.IN,
982 dns.rdatatype.A,
983 '127.0.0.1')
984 response.answer.append(rrset)
985
986 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-forwarded-for: 127.0.0.1:42, 127.0.0.1, 192.0.2.1:4200'])
987 self.assertTrue(receivedQuery)
988 self.assertTrue(receivedResponse)
989 receivedQuery.id = expectedQuery.id
990 self.assertEquals(expectedQuery, receivedQuery)
991 self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
992 self.assertEquals(response, receivedResponse)
993
994 def testDOHDeniedForwarded(self):
995 """
996 DOH with X-Forwarded-For not allowed
997 """
998 name = 'not-allowed.forwarded.doh.tests.powerdns.com.'
999 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
1000 query.id = 0
1001 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
1002 expectedQuery.id = 0
1003 response = dns.message.make_response(query)
1004 rrset = dns.rrset.from_text(name,
1005 3600,
1006 dns.rdataclass.IN,
1007 dns.rdatatype.A,
1008 '127.0.0.1')
1009 response.answer.append(rrset)
1010
1011 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, useQueue=False, rawResponse=True, customHeaders=['x-forwarded-for: 127.0.0.1:42, 127.0.0.1'])
1012
1013 self.assertEquals(self._rcode, 403)
1014 self.assertEquals(receivedResponse, b'dns query not allowed because of ACL')
1015
1016 class TestDOHForwardedForNoTrusted(DNSDistDOHTest):
1017
1018 _serverKey = 'server.key'
1019 _serverCert = 'server.chain'
1020 _serverName = 'tls.tests.dnsdist.org'
1021 _caCert = 'ca.pem'
1022 _dohServerPort = 8443
1023 _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
1024 _config_template = """
1025 newServer{address="127.0.0.1:%s"}
1026
1027 setACL('192.0.2.1/32')
1028 addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
1029 """
1030 _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
1031
1032 def testDOHForwardedUntrusted(self):
1033 """
1034 DOH with X-Forwarded-For not trusted
1035 """
1036 name = 'not-trusted.forwarded.doh.tests.powerdns.com.'
1037 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
1038 query.id = 0
1039 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
1040 expectedQuery.id = 0
1041 response = dns.message.make_response(query)
1042 rrset = dns.rrset.from_text(name,
1043 3600,
1044 dns.rdataclass.IN,
1045 dns.rdatatype.A,
1046 '127.0.0.1')
1047 response.answer.append(rrset)
1048
1049 (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, useQueue=False, rawResponse=True, customHeaders=['x-forwarded-for: 192.0.2.1:4200'])
1050
1051 self.assertEquals(self._rcode, 403)
1052 self.assertEquals(receivedResponse, b'dns query not allowed because of ACL')