emptyECSText = 'No ECS received'
nameECS = 'ecs-echo.example.'
+nameECSInvalidScope = 'invalid-scope.ecs-echo.example.'
ttlECS = 60
ecsReactorRunning = False
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected, ttlECS)
+ def testSendECSInvalidScope(self):
+ # test that the recursor does not cache with a more specific scope than the source it sent
+ expected = dns.rrset.from_text(nameECSInvalidScope, ttlECS, dns.rdataclass.IN, 'TXT', '192.0.2.0/24')
+
+ ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
+ query = dns.message.make_query(nameECSInvalidScope, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+
+ self.sendECSQuery(query, expected)
+
class testECSIPMismatch(ECSTest):
_confdir = 'ECSIPMismatch'
response.flags |= dns.flags.AA
ecso = None
- if request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.TXT:
+ if (request.question[0].name == dns.name.from_text(nameECS) or request.question[0].name == dns.name.from_text(nameECSInvalidScope)) and request.question[0].rdtype == dns.rdatatype.TXT:
+
text = emptyECSText
for option in request.options:
if option.otype == clientsubnetoption.ASSIGNED_OPTION_CODE and isinstance(option, clientsubnetoption.ClientSubnetOption):
text = self.ipToStr(option) + '/' + str(option.mask)
- ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
- answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'TXT', text)
+ # Send a scope more specific than the received source for nameECSInvalidScope
+ if request.question[0].name == dns.name.from_text(nameECSInvalidScope):
+ ecso = clientsubnetoption.ClientSubnetOption("192.0.42.42", 32, 32)
+ else:
+ ecso = clientsubnetoption.ClientSubnetOption(self.ipToStr(option), option.mask, option.mask)
+
+ answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', text)
response.answer.append(answer)
+
elif request.question[0].name == dns.name.from_text(nameECS) and request.question[0].rdtype == dns.rdatatype.NS:
answer = dns.rrset.from_text(nameECS, ttlECS, dns.rdataclass.IN, 'NS', 'ns1.ecs-echo.example.')
response.answer.append(answer)