disable-syslog=yes
"""
- def sendECSQuery(self, query, expected, expectedFirstTTL=None):
+ def sendECSQuery(self, query, expected, expectedFirstTTL=None, scopeZeroResponse=None):
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertEqual(res.answer[0].ttl, expectedFirstTTL)
else:
expectedFirstTTL = res.answer[0].ttl
+ self.assertEquals(res.edns, query.edns)
+
+ if scopeZeroResponse is not None:
+ self.assertEqual(res.edns, 0)
+ if scopeZeroResponse:
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 8)
+ self.assertEqual(res.options[0].scope, 0)
+ else:
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 8)
+ self.assertNotEqual(res.options[0].scope, 0)
# wait one second, check that the TTL has been
# decreased indicating a cache hit
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertRRsetInAnswer(res, expected)
self.assertLess(res.answer[0].ttl, expectedFirstTTL)
+ self.assertEquals(res.edns, query.edns)
def checkECSQueryHit(self, query, expected):
res = self.sendUDPQuery(query)
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- self.sendECSQuery(query, expected)
+ self.sendECSQuery(query, expected, scopeZeroResponse=True)
def testNoECS(self):
expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- self.sendECSQuery(query, expected)
+ self.sendECSQuery(query, expected, scopeZeroResponse=True)
class testECSByName(ECSTest):
_confdir = 'ECSByName'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected)
+class testTooLargeToAddZeroScope(RecursorTest):
+
+ _confdir = 'TooLargeToAddZeroScope'
+ _config_template_default = """
+use-incoming-edns-subnet=yes
+dnssec=validate
+daemon=no
+trace=yes
+packetcache-ttl=0
+packetcache-servfail-ttl=0
+max-cache-ttl=15
+threads=1
+loglevel=9
+disable-syslog=yes
+log-common-errors=yes
+"""
+ _config_template = """
+ """
+ _lua_dns_script_file = """
+ function preresolve(dq)
+ if dq.qname == newDN('toolarge.ecs.') then
+ dq:addRecord(pdns.TXT, '%s', pdns.place.ANSWER)
+ return true
+ end
+ return false
+ end
+ """ % ('A'*447)
+
+ _roothints = None
+
+ @classmethod
+ def setUpClass(cls):
+
+ # we don't need all the auth stuff
+ cls.setUpSockets()
+ cls.startResponders()
+
+ confdir = os.path.join('configs', cls._confdir)
+ cls.createConfigDir(confdir)
+
+ cls.generateRecursorConfig(confdir)
+ cls.startRecursor(confdir, cls._recursorPort)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.tearDownRecursor()
+
+ @classmethod
+ def generateRecursorConfig(cls, confdir):
+ super(testTooLargeToAddZeroScope, cls).generateRecursorConfig(confdir)
+
+ def testTooLarge(self):
+ qname = 'toolarge.ecs.'
+ ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 24)
+ query = dns.message.make_query(qname, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+
+ # should not have an ECS Option since the packet is too large already
+ res = self.sendUDPQuery(query, timeout=5.0)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEquals(len(res.answer), 1)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 0)
+
+ res = self.sendTCPQuery(query, timeout=5.0)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEquals(len(res.answer), 1)
+ self.assertEqual(res.edns, 0)
+ self.assertEqual(len(res.options), 1)
+ self.assertEqual(res.options[0].otype, 8)
+ self.assertEqual(res.options[0].scope, 0)
+
class UDPECSResponder(DatagramProtocol):
@staticmethod
def ipToStr(option):