self.assertTrue(dropped)
+
class TestDOHForwardedForNoTrustedNGHTTP2(DOHForwardedForNoTrusted, DNSDistDOHTest):
- _dohLibrary = 'nghttp2'
+ _dohLibrary = "nghttp2"
+
+ class DOHDelayedACL(DNSDistDOHTest):
+
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _dohServerPort = pickAvailablePort()
+ _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
+ _dohLibrary = 'nghttp2'
+ _yaml_config_template = """
+ acl:
+ - "192.0.2.1/32"
+ backends:
+ - address: "127.0.0.1:%d"
+ protocol: "Do53"
+ binds:
+ - listen_address: "127.0.0.1:%d"
+ reuseport: true
+ protocol: "DoH"
+ tls:
+ certificates:
+ - certificate: "%s"
+ key: "%s"
+ doh:
+ provider: "%s"
+ paths:
+ - "/"
+ early_acl_drop: false
+ """
+ _yaml_config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_dohLibrary']
+ _config_params = []
+ _verboseMode = True
+
+ def testDOHDelayedACL(self):
+ """
+ DOH: Delayed ACL check
+ """
+ name = 'delayed-acl-drop.doh.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ query.id = 0
+ expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+ expectedQuery.id = 0
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, useQueue=False, rawResponse=True)
+ self.assertEqual(self._rcode, 403)
+ self.assertEqual(receivedResponse, b'DoH query not allowed because of ACL')
+
class DOHFrontendLimits(object):
-
# this test suite uses a different responder port
# because it uses a different health check configuration
_testServerPort = pickAvailablePort()