emptyECSText = 'No ECS received'
nameECS = 'ecs-echo.example.'
+nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
ttlECS = 60
ecsReactorRunning = False
daemon=no
trace=yes
dont-query=
+ecs-add-for=0.0.0.0/0
local-address=127.0.0.1
packetcache-ttl=0
packetcache-servfail-ttl=0
_config_template = """edns-subnet-whitelist=ecs-echo.example.
ecs-ipv4-bits=32
forward-zones=ecs-echo.example=%s.21
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
""" % (os.environ['PREFIX'])
def testSendECS(self):
use-incoming-edns-subnet=yes
forward-zones=ecs-echo.example=%s.21
ecs-scope-zero-address=2001:db8::42
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
""" % (os.environ['PREFIX'])
def testSendECS(self):
ecs-ipv4-bits=32
forward-zones=ecs-echo.example=%s.21
ecs-scope-zero-address=192.168.0.1
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
""" % (os.environ['PREFIX'])
def testSendECS(self):
ecs-ipv4-bits=16
forward-zones=ecs-echo.example=%s.21
ecs-scope-zero-address=192.168.0.1
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
""" % (os.environ['PREFIX'])
def testSendECS(self):
_config_template = """edns-subnet-whitelist=ecs-echo.example.
use-incoming-edns-subnet=yes
ecs-ipv6-bits=128
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
forward-zones=ecs-echo.example=%s.21
query-local-address6=::1
""" % (os.environ['PREFIX'])
use-incoming-edns-subnet=yes
forward-zones=ecs-echo.example=%s.21
ecs-scope-zero-address=::1
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
""" % (os.environ['PREFIX'], os.environ['PREFIX'])
def testSendECS(self):
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected, ttlECS)
+ def testSendECSInvalidScope(self):
+ # test that the recursor does not cache with a more specific scope than the source it sent
+ expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
+
+ ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
+ query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+
+ self.sendECSQuery(query, expected)
+
class testECSIPMismatch(ECSTest):
_confdir = 'ECSIPMismatch'
response.flags |= dns.flags.AA
ecso = None
- if request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.TXT:
+ if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
+
text = emptyECSText
for option in request.options:
if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
text = self.ipToStr(option) + '/' + str(option.mask)
- ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
- answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', text)
+ # Send a scope more specific than the received source for nameECSInvalidScope
+ if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
+ ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
+ else:
+ ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
+
+ answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
response.answer.append(answer)
+
elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
response.answer.append(answer)
- additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', cls._PREFIX + '.21')
+ additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
response.additional.append(additional)
if ecso: