receivedQuery.id = query.id
self.assertEqual(receivedQuery, query)
self.assertEqual(receivedResponse, response)
+
+class TestDOHLimits(DNSDistDOHTest):
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _dohServerPort = 8443
+ _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _maxTCPConnsPerClient = 3
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" })
+ setMaxTCPConnectionsPerClient(%s)
+ """
+ _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey', '_maxTCPConnsPerClient']
+
+ def testConnsPerClient(self):
+ """
+ DoH Limits: Maximum number of conns per client
+ """
+ name = 'maxconnsperclient.doh.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ url = self.getDOHGetURL(self._dohBaseURL, query)
+ conns = []
+
+ for idx in range(self._maxTCPConnsPerClient + 1):
+ conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
+ conn.setopt(pycurl.URL, url)
+ conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
+ conn.setopt(pycurl.SSL_VERIFYPEER, 1)
+ conn.setopt(pycurl.SSL_VERIFYHOST, 2)
+ conn.setopt(pycurl.CAINFO, self._caCert)
+ conns.append(conn)
+
+ count = 0
+ failed = 0
+ for conn in conns:
+ try:
+ data = conn.perform_rb()
+ rcode = conn.getinfo(pycurl.RESPONSE_CODE)
+ count = count + 1
+ except:
+ failed = failed + 1
+
+ for conn in conns:
+ conn.close()
+
+ # wait a bit to be sure that dnsdist closed the connections
+ # and decremented the counters on its side, otherwise subsequent
+ # connections will be dropped
+ time.sleep(1)
+
+ self.assertEqual(count, self._maxTCPConnsPerClient)
+ self.assertEqual(failed, 1)