From: Remi Gacogne Date: Fri, 19 Apr 2024 14:37:43 +0000 (+0200) Subject: dnsdist: Add regression for destination address harvesting with QUIC X-Git-Tag: rec-5.1.0-alpha1~26^2~1 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=88913b8327e1cc443b9e22224e961c0fa47bfdb7;p=thirdparty%2Fpdns.git dnsdist: Add regression for destination address harvesting with QUIC --- diff --git a/regression-tests.dnsdist/quictests.py b/regression-tests.dnsdist/quictests.py index 62cf24e757..743de28db5 100644 --- a/regression-tests.dnsdist/quictests.py +++ b/regression-tests.dnsdist/quictests.py @@ -169,3 +169,25 @@ class QUICWithCacheTests(object): total += self._responsesCounter[key] self.assertEqual(total, 1) + +class QUICGetLocalAddressOnAnyBindTests(object): + + def testGetLocalAddressOnAnyBind(self): + """ + QUIC: Return CNAME containing the local address for an ANY bind + """ + name = 'local-address-any.quic.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + # dnsdist set RA = RD for spoofed responses + query.flags &= ~dns.flags.RD + + response = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 60, + dns.rdataclass.IN, + dns.rdatatype.CNAME, + 'address-was-127-0-0-1.local-address-any.advanced.tests.powerdns.com.') + response.answer.append(rrset) + + (_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False) + self.assertEqual(receivedResponse, response) diff --git a/regression-tests.dnsdist/test_DOH3.py b/regression-tests.dnsdist/test_DOH3.py index 4704c26901..4a91c433f2 100644 --- a/regression-tests.dnsdist/test_DOH3.py +++ b/regression-tests.dnsdist/test_DOH3.py @@ -4,7 +4,7 @@ import clientsubnetoption from dnsdisttests import DNSDistTest from dnsdisttests import pickAvailablePort -from quictests import QUICTests, QUICWithCacheTests, QUICACLTests +from quictests import QUICTests, QUICWithCacheTests, QUICACLTests, QUICGetLocalAddressOnAnyBindTests import doh3client class TestDOH3(QUICTests, DNSDistTest): @@ -92,3 +92,33 @@ class TestDOH3Specifics(DNSDistTest): receivedQuery.id = expectedQuery.id self.assertEqual(expectedQuery, receivedQuery) self.assertEqual(receivedResponse, response) + +class TestDOH3GetLocalAddressOnAnyBind(QUICGetLocalAddressOnAnyBindTests, DNSDistTest): + _serverKey = 'server.key' + _serverCert = 'server.chain' + _serverName = 'tls.tests.dnsdist.org' + _caCert = 'ca.pem' + _doqServerPort = pickAvailablePort() + _dohBaseURL = ("https://%s:%d/" % (_serverName, _doqServerPort)) + _config_template = """ + function answerBasedOnLocalAddress(dq) + local dest = tostring(dq.localaddr) + local i, j = string.find(dest, "[0-9.]+") + local addr = string.sub(dest, i, j) + local dashAddr = string.gsub(addr, "[.]", "-") + return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com." + end + addAction("local-address-any.quic.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress)) + newServer{address="127.0.0.1:%s"} + addDOH3Local("0.0.0.0:%d", "%s", "%s") + addDOH3Local("[::]:%d", "%s", "%s") + """ + _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey', '_doqServerPort','_serverCert', '_serverKey'] + _acl = ['127.0.0.1/32', '::1/128'] + _skipListeningOnCL = True + + def getQUICConnection(self): + return self.getDOQConnection(self._doqServerPort, self._caCert) + + def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): + return self.sendDOH3Query(self._doqServerPort, self._dohBaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection) diff --git a/regression-tests.dnsdist/test_DOQ.py b/regression-tests.dnsdist/test_DOQ.py index 9af5d8a938..657df001a9 100644 --- a/regression-tests.dnsdist/test_DOQ.py +++ b/regression-tests.dnsdist/test_DOQ.py @@ -6,7 +6,7 @@ import clientsubnetoption from dnsdisttests import DNSDistTest from dnsdisttests import pickAvailablePort from doqclient import quic_bogus_query -from quictests import QUICTests, QUICWithCacheTests, QUICACLTests +from quictests import QUICTests, QUICWithCacheTests, QUICACLTests, QUICGetLocalAddressOnAnyBindTests import doqclient from doqclient import quic_query @@ -142,3 +142,32 @@ class TestDOQCertificateReloading(DNSDistTest): (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName) # check that the serial is different self.assertNotEqual(serial, secondSerial) + +class TestDOQGetLocalAddressOnAnyBind(QUICGetLocalAddressOnAnyBindTests, DNSDistTest): + _serverKey = 'server.key' + _serverCert = 'server.chain' + _serverName = 'tls.tests.dnsdist.org' + _caCert = 'ca.pem' + _doqServerPort = pickAvailablePort() + _config_template = """ + function answerBasedOnLocalAddress(dq) + local dest = tostring(dq.localaddr) + local i, j = string.find(dest, "[0-9.]+") + local addr = string.sub(dest, i, j) + local dashAddr = string.gsub(addr, "[.]", "-") + return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com." + end + addAction("local-address-any.quic.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress)) + newServer{address="127.0.0.1:%s"} + addDOQLocal("0.0.0.0:%d", "%s", "%s") + addDOQLocal("[::]:%d", "%s", "%s") + """ + _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey', '_doqServerPort','_serverCert', '_serverKey'] + _acl = ['127.0.0.1/32', '::1/128'] + _skipListeningOnCL = True + + def getQUICConnection(self): + return self.getDOQConnection(self._doqServerPort, self._caCert) + + def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): + return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)