]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
rec: Test that the "zero scope" option doesn't exceed the maximum payload size
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 17 Nov 2020 10:21:14 +0000 (11:21 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 18 Nov 2020 09:08:56 +0000 (10:08 +0100)
When use-incoming-edns-subnet is enabled, the query has an ECS option,
and the answer was not variable, we do return a 0-scoped ECS answer,
to make it possible to the client (dnsdist for example) to cache the
response and serve it to all clients.
Still we need to make sure that adding the 0-scoped ECS option does not
exceed the client EDNS UDP payload size.

regression-tests.recursor-dnssec/test_ECS.py

index f732fc868a7f062694cc98586936d40226ba5c2c..9e0bd47d631a49c6c5c6e0243a6a4c59d360ece7 100644 (file)
@@ -32,7 +32,7 @@ loglevel=9
 disable-syslog=yes
 """
 
-    def sendECSQuery(self, query, expected, expectedFirstTTL=None):
+    def sendECSQuery(self, query, expected, expectedFirstTTL=None, scopeZeroResponse=None):
         res = self.sendUDPQuery(query)
 
         self.assertRcodeEqual(res, dns.rcode.NOERROR)
@@ -42,6 +42,18 @@ disable-syslog=yes
             self.assertEqual(res.answer[0].ttl, expectedFirstTTL)
         else:
             expectedFirstTTL = res.answer[0].ttl
+        self.assertEquals(res.edns, query.edns)
+
+        if scopeZeroResponse is not None:
+            self.assertEqual(res.edns, 0)
+            if scopeZeroResponse:
+                self.assertEqual(len(res.options), 1)
+                self.assertEqual(res.options[0].otype, 8)
+                self.assertEqual(res.options[0].scope, 0)
+            else:
+                self.assertEqual(len(res.options), 1)
+                self.assertEqual(res.options[0].otype, 8)
+                self.assertNotEqual(res.options[0].scope, 0)
 
         # wait one second, check that the TTL has been
         # decreased indicating a cache hit
@@ -52,6 +64,7 @@ disable-syslog=yes
         self.assertRcodeEqual(res, dns.rcode.NOERROR)
         self.assertRRsetInAnswer(res, expected)
         self.assertLess(res.answer[0].ttl, expectedFirstTTL)
+        self.assertEquals(res.edns, query.edns)
 
     def checkECSQueryHit(self, query, expected):
         res = self.sendUDPQuery(query)
@@ -139,7 +152,7 @@ forward-zones=ecs-echo.example=%s.21
 
         ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
         query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
-        self.sendECSQuery(query, expected)
+        self.sendECSQuery(query, expected, scopeZeroResponse=True)
 
     def testNoECS(self):
         expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
@@ -152,7 +165,7 @@ forward-zones=ecs-echo.example=%s.21
 
         ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
         query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
-        self.sendECSQuery(query, expected)
+        self.sendECSQuery(query, expected, scopeZeroResponse=True)
 
 class testECSByName(ECSTest):
     _confdir = 'ECSByName'
@@ -499,6 +512,77 @@ forward-zones=ecs-echo.example=%s.21
         query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
         self.sendECSQuery(query, expected)
 
+class testTooLargeToAddZeroScope(RecursorTest):
+
+    _confdir = 'TooLargeToAddZeroScope'
+    _config_template_default = """
+use-incoming-edns-subnet=yes
+dnssec=validate
+daemon=no
+trace=yes
+packetcache-ttl=0
+packetcache-servfail-ttl=0
+max-cache-ttl=15
+threads=1
+loglevel=9
+disable-syslog=yes
+log-common-errors=yes
+"""
+    _config_template = """
+    """
+    _lua_dns_script_file = """
+    function preresolve(dq)
+      if dq.qname == newDN('toolarge.ecs.') then
+        dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
+        return true
+      end
+      return false
+    end
+    """ % ('A'*447)
+
+    _roothints = None
+
+    @classmethod
+    def setUpClass(cls):
+
+        # we don't need all the auth stuff
+        cls.setUpSockets()
+        cls.startResponders()
+
+        confdir = os.path.join('configs', cls._confdir)
+        cls.createConfigDir(confdir)
+
+        cls.generateRecursorConfig(confdir)
+        cls.startRecursor(confdir, cls._recursorPort)
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.tearDownRecursor()
+
+    @classmethod
+    def generateRecursorConfig(cls, confdir):
+        super(testTooLargeToAddZeroScope, cls).generateRecursorConfig(confdir)
+
+    def testTooLarge(self):
+        qname = 'toolarge.ecs.'
+        ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
+        query = dns.message.make_query(qname, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+
+        # should not have an ECS Option since the packet is too large already
+        res = self.sendUDPQuery(query, timeout=5.0)
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertEquals(len(res.answer), 1)
+        self.assertEqual(res.edns, 0)
+        self.assertEqual(len(res.options), 0)
+
+        res = self.sendTCPQuery(query, timeout=5.0)
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertEquals(len(res.answer), 1)
+        self.assertEqual(res.edns, 0)
+        self.assertEqual(len(res.options), 1)
+        self.assertEqual(res.options[0].otype, 8)
+        self.assertEqual(res.options[0].scope, 0)
+
 class UDPECSResponder(DatagramProtocol):
     @staticmethod
     def ipToStr(option):