]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add regression tests for packet cache and EDNS padding
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 3 Sep 2025 07:23:20 +0000 (09:23 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 3 Sep 2025 07:23:20 +0000 (09:23 +0200)
Signed-off-by: Remi Gacogne <remi.gacogne@powerdns.com>
regression-tests.dnsdist/paddingoption.py [new symlink]
regression-tests.dnsdist/randompaddingoption.py [new file with mode: 0644]
regression-tests.dnsdist/test_CachePadding.py [new file with mode: 0644]
regression-tests.recursor-dnssec/paddingoption.py

diff --git a/regression-tests.dnsdist/paddingoption.py b/regression-tests.dnsdist/paddingoption.py
new file mode 120000 (symlink)
index 0000000..e4ef68e
--- /dev/null
@@ -0,0 +1 @@
+../regression-tests.recursor-dnssec/paddingoption.py
\ No newline at end of file
diff --git a/regression-tests.dnsdist/randompaddingoption.py b/regression-tests.dnsdist/randompaddingoption.py
new file mode 100644 (file)
index 0000000..b8a69cf
--- /dev/null
@@ -0,0 +1,20 @@
+#!/usr/bin/env python
+import os
+import paddingoption
+
+class RandomPaddingOption(paddingoption.PaddingOption):
+    """Implementation of rfc7830 using random bytes in the payload.
+    """
+
+    def __init__(self, numberOfBytes):
+        super(RandomPaddingOption, self).__init__(12)
+        self.numberOfBytes = numberOfBytes
+
+    def to_wire(self, file=None):
+        """Create EDNS packet as defined in rfc7830 using random bytes in the payload."""
+
+        payload = os.urandom(self.numberOfBytes)
+        if file:
+            file.write(payload)
+        else:
+            return payload
diff --git a/regression-tests.dnsdist/test_CachePadding.py b/regression-tests.dnsdist/test_CachePadding.py
new file mode 100644 (file)
index 0000000..3dc969d
--- /dev/null
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+import dns
+import paddingoption
+import randompaddingoption
+from dnsdisttests import DNSDistTest, pickAvailablePort
+
+class TestCachePadding(DNSDistTest):
+
+    _config_template = """
+    pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
+    getPool(""):setCache(pc)
+    newServer{address="127.0.0.1:%d"}
+    """
+
+    def testCached(self):
+        """
+        Cache padding
+        """
+        name = 'padding.cache-padding.tests.powerdns.com.'
+        po = paddingoption.PaddingOption(64)
+        query = dns.message.make_query(name, 'A', want_dnssec=True, options=[po])
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '::1')
+        response.answer.append(rrset)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEqual(query, receivedQuery)
+        self.assertEqual(receivedResponse, response)
+
+        # identical query, should be cached
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEqual(receivedResponse, response)
+
+        # generate a new padding payload, with random bytes
+        rpo = randompaddingoption.RandomPaddingOption(64)
+        query = dns.message.make_query(name, 'A', want_dnssec=True, options=[rpo])
+        response = dns.message.make_response(query)
+        response.answer.append(rrset)
+
+        # identical query except for the padding content which should be skipped, should be cached
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEqual(receivedResponse, response)
+
+class TestCacheNotSkippingPadding(DNSDistTest):
+
+    _config_template = """
+    -- only skip EDNS cookies, not padding
+    pc = newPacketCache(100, {maxTTL=86400, minTTL=1, skipOptions={10}})
+    getPool(""):setCache(pc)
+    newServer{address="127.0.0.1:%d"}
+    """
+
+    def testCached(self):
+        """
+        Cache padding: not skipping the padding
+        """
+        name = 'not-skipping-padding.cache-padding.tests.powerdns.com.'
+        po = paddingoption.PaddingOption(64)
+        query = dns.message.make_query(name, 'A', want_dnssec=True, options=[po])
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.AAAA,
+                                    '::1')
+        response.answer.append(rrset)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEqual(query, receivedQuery)
+        self.assertEqual(receivedResponse, response)
+
+        # identical query, should be cached
+        (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+        self.assertEqual(receivedResponse, response)
+
+        # generate a new padding payload, with random bytes
+        rpo = randompaddingoption.RandomPaddingOption(64)
+        query = dns.message.make_query(name, 'A', want_dnssec=True, options=[rpo])
+        response = dns.message.make_response(query)
+
+        # identical query except for the padding content which should NOT be skipped, should NOT be cached
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEqual(query, receivedQuery)
+        self.assertEqual(receivedResponse, response)
index 728da93a7d092b884c5f225833b25e4d17f6b9ad..db541f70c1089c64730f018441665934e9641a93 100644 (file)
@@ -47,6 +47,9 @@ class PaddingOption(dns.edns.Option):
             self.numberOfBytes
         )
 
+    def to_text(self):
+        return self.__repr__()
+
     def __eq__(self, other):
         if not isinstance(other, PaddingOption):
             return False