]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add regression for destination address harvesting with QUIC
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 19 Apr 2024 14:37:43 +0000 (16:37 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 19 Apr 2024 14:41:32 +0000 (16:41 +0200)
regression-tests.dnsdist/quictests.py
regression-tests.dnsdist/test_DOH3.py
regression-tests.dnsdist/test_DOQ.py

index 62cf24e757a47bfa63149d5c1352d0e6b9fa4a2f..743de28db561eab5374d23f7ff6dd08af8e3ebb5 100644 (file)
@@ -169,3 +169,25 @@ class QUICWithCacheTests(object):
             total += self._responsesCounter[key]
 
         self.assertEqual(total, 1)
+
+class QUICGetLocalAddressOnAnyBindTests(object):
+
+    def testGetLocalAddressOnAnyBind(self):
+        """
+        QUIC: Return CNAME containing the local address for an ANY bind
+        """
+        name = 'local-address-any.quic.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        # dnsdist set RA = RD for spoofed responses
+        query.flags &= ~dns.flags.RD
+
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.CNAME,
+                                    'address-was-127-0-0-1.local-address-any.advanced.tests.powerdns.com.')
+        response.answer.append(rrset)
+
+        (_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False)
+        self.assertEqual(receivedResponse, response)
index 4704c26901e292afffb2fddb6536f03e8eb0bd8f..4a91c433f25ea962a6e451a3e998b92415681cd2 100644 (file)
@@ -4,7 +4,7 @@ import clientsubnetoption
 
 from dnsdisttests import DNSDistTest
 from dnsdisttests import pickAvailablePort
-from quictests import QUICTests, QUICWithCacheTests, QUICACLTests
+from quictests import QUICTests, QUICWithCacheTests, QUICACLTests, QUICGetLocalAddressOnAnyBindTests
 import doh3client
 
 class TestDOH3(QUICTests, DNSDistTest):
@@ -92,3 +92,33 @@ class TestDOH3Specifics(DNSDistTest):
         receivedQuery.id = expectedQuery.id
         self.assertEqual(expectedQuery, receivedQuery)
         self.assertEqual(receivedResponse, response)
+
+class TestDOH3GetLocalAddressOnAnyBind(QUICGetLocalAddressOnAnyBindTests, DNSDistTest):
+    _serverKey = 'server.key'
+    _serverCert = 'server.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _doqServerPort = pickAvailablePort()
+    _dohBaseURL = ("https://%s:%d/" % (_serverName, _doqServerPort))
+    _config_template = """
+    function answerBasedOnLocalAddress(dq)
+      local dest = tostring(dq.localaddr)
+      local i, j = string.find(dest, "[0-9.]+")
+      local addr = string.sub(dest, i, j)
+      local dashAddr = string.gsub(addr, "[.]", "-")
+      return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com."
+    end
+    addAction("local-address-any.quic.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress))
+    newServer{address="127.0.0.1:%s"}
+    addDOH3Local("0.0.0.0:%d", "%s", "%s")
+    addDOH3Local("[::]:%d", "%s", "%s")
+    """
+    _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey', '_doqServerPort','_serverCert', '_serverKey']
+    _acl = ['127.0.0.1/32', '::1/128']
+    _skipListeningOnCL = True
+
+    def getQUICConnection(self):
+        return self.getDOQConnection(self._doqServerPort, self._caCert)
+
+    def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
+        return self.sendDOH3Query(self._doqServerPort, self._dohBaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
index 9af5d8a9387bfceebd462feeea7a1ad59fdb599b..657df001a9931a9fa1b13e678364b5de9bf6c0f5 100644 (file)
@@ -6,7 +6,7 @@ import clientsubnetoption
 from dnsdisttests import DNSDistTest
 from dnsdisttests import pickAvailablePort
 from doqclient import quic_bogus_query
-from quictests import QUICTests, QUICWithCacheTests, QUICACLTests
+from quictests import QUICTests, QUICWithCacheTests, QUICACLTests, QUICGetLocalAddressOnAnyBindTests
 import doqclient
 from doqclient import quic_query
 
@@ -142,3 +142,32 @@ class TestDOQCertificateReloading(DNSDistTest):
         (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
         # check that the serial is different
         self.assertNotEqual(serial, secondSerial)
+
+class TestDOQGetLocalAddressOnAnyBind(QUICGetLocalAddressOnAnyBindTests, DNSDistTest):
+    _serverKey = 'server.key'
+    _serverCert = 'server.chain'
+    _serverName = 'tls.tests.dnsdist.org'
+    _caCert = 'ca.pem'
+    _doqServerPort = pickAvailablePort()
+    _config_template = """
+    function answerBasedOnLocalAddress(dq)
+      local dest = tostring(dq.localaddr)
+      local i, j = string.find(dest, "[0-9.]+")
+      local addr = string.sub(dest, i, j)
+      local dashAddr = string.gsub(addr, "[.]", "-")
+      return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com."
+    end
+    addAction("local-address-any.quic.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress))
+    newServer{address="127.0.0.1:%s"}
+    addDOQLocal("0.0.0.0:%d", "%s", "%s")
+    addDOQLocal("[::]:%d", "%s", "%s")
+    """
+    _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey', '_doqServerPort','_serverCert', '_serverKey']
+    _acl = ['127.0.0.1/32', '::1/128']
+    _skipListeningOnCL = True
+
+    def getQUICConnection(self):
+        return self.getDOQConnection(self._doqServerPort, self._caCert)
+
+    def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
+        return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)