From: Remi Gacogne Date: Tue, 17 Nov 2020 10:21:14 +0000 (+0100) Subject: rec: Test that the "zero scope" option doesn't exceed the maximum payload size X-Git-Tag: auth-4.4.0-beta1~2^2~1 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=c86e11926b4f1e64bc020c8db388f9996d0f268e;p=thirdparty%2Fpdns.git rec: Test that the "zero scope" option doesn't exceed the maximum payload size When use-incoming-edns-subnet is enabled, the query has an ECS option, and the answer was not variable, we do return a 0-scoped ECS answer, to make it possible to the client (dnsdist for example) to cache the response and serve it to all clients. Still we need to make sure that adding the 0-scoped ECS option does not exceed the client EDNS UDP payload size. --- diff --git a/regression-tests.recursor-dnssec/test_ECS.py b/regression-tests.recursor-dnssec/test_ECS.py index f732fc868a..9e0bd47d63 100644 --- a/regression-tests.recursor-dnssec/test_ECS.py +++ b/regression-tests.recursor-dnssec/test_ECS.py @@ -32,7 +32,7 @@ loglevel=9 disable-syslog=yes """ - def sendECSQuery(self, query, expected, expectedFirstTTL=None): + def sendECSQuery(self, query, expected, expectedFirstTTL=None, scopeZeroResponse=None): res = self.sendUDPQuery(query) self.assertRcodeEqual(res, dns.rcode.NOERROR) @@ -42,6 +42,18 @@ disable-syslog=yes self.assertEqual(res.answer[0].ttl, expectedFirstTTL) else: expectedFirstTTL = res.answer[0].ttl + self.assertEquals(res.edns, query.edns) + + if scopeZeroResponse is not None: + self.assertEqual(res.edns, 0) + if scopeZeroResponse: + self.assertEqual(len(res.options), 1) + self.assertEqual(res.options[0].otype, 8) + self.assertEqual(res.options[0].scope, 0) + else: + self.assertEqual(len(res.options), 1) + self.assertEqual(res.options[0].otype, 8) + self.assertNotEqual(res.options[0].scope, 0) # wait one second, check that the TTL has been # decreased indicating a cache hit @@ -52,6 +64,7 @@ disable-syslog=yes self.assertRcodeEqual(res, dns.rcode.NOERROR) self.assertRRsetInAnswer(res, expected) self.assertLess(res.answer[0].ttl, expectedFirstTTL) + self.assertEquals(res.edns, query.edns) def checkECSQueryHit(self, query, expected): res = self.sendUDPQuery(query) @@ -139,7 +152,7 @@ forward-zones=ecs-echo.example=%s.21 ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32) query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512) - self.sendECSQuery(query, expected) + self.sendECSQuery(query, expected, scopeZeroResponse=True) def testNoECS(self): expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText) @@ -152,7 +165,7 @@ forward-zones=ecs-echo.example=%s.21 ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0) query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512) - self.sendECSQuery(query, expected) + self.sendECSQuery(query, expected, scopeZeroResponse=True) class testECSByName(ECSTest): _confdir = 'ECSByName' @@ -499,6 +512,77 @@ forward-zones=ecs-echo.example=%s.21 query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512) self.sendECSQuery(query, expected) +class testTooLargeToAddZeroScope(RecursorTest): + + _confdir = 'TooLargeToAddZeroScope' + _config_template_default = """ +use-incoming-edns-subnet=yes +dnssec=validate +daemon=no +trace=yes +packetcache-ttl=0 +packetcache-servfail-ttl=0 +max-cache-ttl=15 +threads=1 +loglevel=9 +disable-syslog=yes +log-common-errors=yes +""" + _config_template = """ + """ + _lua_dns_script_file = """ + function preresolve(dq) + if dq.qname == newDN('toolarge.ecs.') then + dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER) + return true + end + return false + end + """ % ('A'*447) + + _roothints = None + + @classmethod + def setUpClass(cls): + + # we don't need all the auth stuff + cls.setUpSockets() + cls.startResponders() + + confdir = os.path.join('configs', cls._confdir) + cls.createConfigDir(confdir) + + cls.generateRecursorConfig(confdir) + cls.startRecursor(confdir, cls._recursorPort) + + @classmethod + def tearDownClass(cls): + cls.tearDownRecursor() + + @classmethod + def generateRecursorConfig(cls, confdir): + super(testTooLargeToAddZeroScope, cls).generateRecursorConfig(confdir) + + def testTooLarge(self): + qname = 'toolarge.ecs.' + ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24) + query = dns.message.make_query(qname, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512) + + # should not have an ECS Option since the packet is too large already + res = self.sendUDPQuery(query, timeout=5.0) + self.assertRcodeEqual(res, dns.rcode.NOERROR) + self.assertEquals(len(res.answer), 1) + self.assertEqual(res.edns, 0) + self.assertEqual(len(res.options), 0) + + res = self.sendTCPQuery(query, timeout=5.0) + self.assertRcodeEqual(res, dns.rcode.NOERROR) + self.assertEquals(len(res.answer), 1) + self.assertEqual(res.edns, 0) + self.assertEqual(len(res.options), 1) + self.assertEqual(res.options[0].otype, 8) + self.assertEqual(res.options[0].scope, 0) + class UDPECSResponder(DatagramProtocol): @staticmethod def ipToStr(option):