import socket
import struct
import threading
+import time
import clientsubnetoption
from recursortests import RecursorTest
from twisted.internet.protocol import DatagramProtocol
emptyECSText = 'No ECS received'
nameECS = 'ecs-echo.example.'
+ttlECS = 60
class ECSTest(RecursorTest):
+ _config_template_default = """
+daemon=no
+trace=yes
+dont-query=
+local-address=127.0.0.1
+packetcache-ttl=0
+packetcache-servfail-ttl=0
+max-cache-ttl=600
+threads=1
+loglevel=9
+disable-syslog=yes
+"""
+
+ def sendECSQuery(self, query, expected, expectedFirstTTL=None):
+ res = self.sendUDPQuery(query)
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertRRsetInAnswer(res, expected)
+ # this will break if you are not looking for the first RR, sorry!
+ if expectedFirstTTL is not None:
+ self.assertEqual(res.answer[0].ttl, expectedFirstTTL)
+ else:
+ expectedFirstTTL = res.answer[0].ttl
+
+ # wait one second, check that the TTL has been
+ # decreased indicating a cache hit
+ time.sleep(1)
+
+ res = self.sendUDPQuery(query)
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertRRsetInAnswer(res, expected)
+ self.assertLess(res.answer[0].ttl, expectedFirstTTL)
+
+ def checkECSQueryHit(self, query, expected):
+ res = self.sendUDPQuery(query)
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertRRsetInAnswer(res, expected)
+ # this will break if you are not looking for the first RR, sorry!
+ self.assertLess(res.answer[0].ttl, ttlECS)
@classmethod
def startResponders(cls):
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class testIncomingNoECS(ECSTest):
_confdir = 'IncomingNoECS'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class testECSByName(ECSTest):
_confdir = 'ECSByName'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
+ self.sendECSQuery(query, expected)
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ # check that a query in a different ECS range is a hit, because we don't use the incoming ECS
+ ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
+ query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+ self.checkECSQueryHit(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class testECSByNameLarger(ECSTest):
_confdir = 'ECSByNameLarger'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
+ self.sendECSQuery(query, expected)
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ # check that a query in a different range is a miss
+ ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
+ query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+ self.sendECSQuery(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class testECSByNameSmaller(ECSTest):
_confdir = 'ECSByNameLarger'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class testIncomingECSByName(ECSTest):
_confdir = 'ECSIncomingByName'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
+ self.sendECSQuery(query, expected, ttlECS)
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ # check that a query in the same ECS range is a hit
+ ecso = clientsubnetoption.ClientSubnetOption('192.0.2.2', 32)
+ query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+ self.checkECSQueryHit(query, expected)
- def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
+ # check that a query in a different ECS range is a miss
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.1.2.0/24')
+ ecso = clientsubnetoption.ClientSubnetOption('192.1.2.2', 32)
+ query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+ self.sendECSQuery(query, expected)
+ def testNoECS(self):
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected, ttlECS)
class testIncomingECSByNameLarger(ECSTest):
_confdir = 'ECSIncomingByNameLarger'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.1/32')
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected, ttlECS)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.1/32')
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected, ttlECS)
class testIncomingECSByNameSmaller(ECSTest):
_confdir = 'ECSIncomingByNameSmaller'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.0.0/16')
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/16')
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class testIncomingECSByNameV6(ECSTest):
_confdir = 'ECSIncomingByNameV6'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '2001:db8::1/128')
ecso = clientsubnetoption.ClientSubnetOption('2001:db8::1', 128)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected, ttlECS)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
query = dns.message.make_query(nameECS, 'TXT')
res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected, ttlECS)
class testECSNameMismatch(ECSTest):
_confdir = 'ECSNameMismatch'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class testECSByIP(ECSTest):
_confdir = 'ECSByIP'
""" % (os.environ['PREFIX'], os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class testIncomingECSByIP(ECSTest):
_confdir = 'ECSIncomingByIP'
""" % (os.environ['PREFIX'], os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', '127.0.0.0/24')
query = dns.message.make_query(nameECS, 'TXT')
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class testECSIPMismatch(ECSTest):
_confdir = 'ECSIPMismatch'
""" % (os.environ['PREFIX'])
def testSendECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
- res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
def testNoECS(self):
- expected = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', emptyECSText)
-
+ expected = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', emptyECSText)
query = dns.message.make_query(nameECS, 'TXT')
res = self.sendUDPQuery(query)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertRRsetInAnswer(res, expected)
+ self.sendECSQuery(query, expected)
class UDPECSResponder(DatagramProtocol):
@staticmethod
request = dns.message.from_wire(datagram)
response = dns.message.make_response(request)
+ response.flags |= dns.flags.AA
+ ecso = None
if request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.TXT:
text = emptyECSText
for option in request.options:
if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
text = self.ipToStr(option) + '/' + str(option.mask)
+ ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
- answer = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'TXT', text)
+ answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', text)
response.answer.append(answer)
elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
- answer = dns.rrset.from_text(nameECS, 60, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
+ answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
response.answer.append(answer)
additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', cls._PREFIX + '.21')
response.additional.append(additional)
+ if ecso:
+ response.options = [ecso]
+
self.transport.write(response.to_wire(), address)