def sendECSQuery(self, query, expected, expectedFirstTTL=None, scopeZeroResponse=None):
res = self.sendUDPQuery(query)
-
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertRRsetInAnswer(res, expected)
# this will break if you are not looking for the first RR, sorry!
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected)
+class NoECSHardenedTest(NoECSTest):
+ _confdir = 'NoECSHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=
+forward-zones=ecs-echo.example=%s.21
+ """ % (os.environ['PREFIX'])
+
+class NoECSInAnswerTest(ECSTest):
+ _confdir = 'NoECSInAnswer'
+
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _config_template = """edns-subnet-allow-list=xecs-echo.example
+forward-zones=xecs-echo.example=%s.21
+webserver=yes
+webserver-port=%d
+webserver-address=127.0.0.1
+webserver-password=%s
+webserver-allow-from=127.0.0.1
+api-key=%s
+ """ % (os.environ['PREFIX'], _wsPort, _wsPassword, _apiKey)
+
+ def test1SendECS(self):
+ expected = dns.rrset.from_text('x'+ nameECS, ttlECS, dns.rdataclass.IN, 'TXT', 'X')
+ ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
+ query = dns.message.make_query('x' + nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+ self.sendECSQuery(query, expected)
+ self.checkMetrics({
+ 'ecs-missing': 0
+ })
+
+ def test2NoECS(self):
+ expected = dns.rrset.from_text('x' + nameECS, ttlECS, dns.rdataclass.IN, 'TXT', 'X')
+ query = dns.message.make_query('x' + nameECS, 'TXT')
+ self.sendECSQuery(query, expected)
+ self.checkMetrics({
+ 'ecs-missing': 0
+ })
+
+ def test3RequireNoECS(self):
+ expected = dns.rrset.from_text('x' + nameECS, ttlECS, dns.rdataclass.IN, 'TXT', 'X')
+
+ ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
+ query = dns.message.make_query('x' + nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+ self.sendECSQuery(query, expected)
+ self.checkMetrics({
+ 'ecs-missing': 0
+ })
+
+class NoECSInAnswerHardenedTest(NoECSInAnswerTest):
+ _confdir = 'NoECSInAnswerHardened'
+
+ _wsPort = 8042
+ _wsTimeout = 2
+ _wsPassword = 'secretpassword'
+ _apiKey = 'secretapikey'
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=xecs-echo.example
+forward-zones=xecs-echo.example=%s.21
+webserver=yes
+webserver-port=%d
+webserver-address=127.0.0.1
+webserver-password=%s
+webserver-allow-from=127.0.0.1
+api-key=%s
+ """ % (os.environ['PREFIX'], _wsPort, _wsPassword, _apiKey)
+
+ # All test below have ecs-missing count to be 1, as they result is a no ecs scoped answer in the cache
+ def test1SendECS(self):
+ expected = dns.rrset.from_text('x'+ nameECS, ttlECS, dns.rdataclass.IN, 'TXT', 'X')
+ ecso = clientsubnetoption.ClientSubnetOption('192.0.2.1', 32)
+ query = dns.message.make_query('x' + nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+ self.sendECSQuery(query, expected)
+ self.checkMetrics({
+ 'ecs-missing': 1
+ })
+
+ def test2NoECS(self):
+ expected = dns.rrset.from_text('x' + nameECS, ttlECS, dns.rdataclass.IN, 'TXT', 'X')
+ query = dns.message.make_query('x' + nameECS, 'TXT')
+ self.sendECSQuery(query, expected)
+ self.checkMetrics({
+ 'ecs-missing': 1
+ })
+
+ def test3RequireNoECS(self):
+ expected = dns.rrset.from_text('x' + nameECS, ttlECS, dns.rdataclass.IN, 'TXT', 'X')
+ ecso = clientsubnetoption.ClientSubnetOption('0.0.0.0', 0)
+ query = dns.message.make_query('x' + nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
+ self.sendECSQuery(query, expected)
+ self.checkMetrics({
+ 'ecs-missing': 1
+ })
+
class IncomingNoECSTest(ECSTest):
_confdir = 'IncomingNoECS'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected, scopeZeroResponse=True)
+class IncomingNoECSHardenedTest(IncomingNoECSTest):
+ _confdir = 'IncomingNoECSHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=
+use-incoming-edns-subnet=yes
+forward-zones=ecs-echo.example=%s.21
+ """ % (os.environ['PREFIX'])
+
class ECSByNameTest(ECSTest):
_confdir = 'ECSByName'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected)
+class ECSByNameLargerHardenedTest(ECSByNameLargerTest):
+ _confdir = 'ECSByNameLargerHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=ecs-echo.example.
+ecs-ipv4-bits=32
+forward-zones=ecs-echo.example=%s.21
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
+ """ % (os.environ['PREFIX'])
+
class ECSByNameSmallerTest(ECSTest):
_confdir = 'ECSByNameSmaller'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected)
+class ECSByNameSmallerHardenedTest(ECSByNameSmallerTest):
+ _confdir = 'ECSByNameSmallerHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=ecs-echo.example.
+ecs-ipv4-bits=16
+forward-zones=ecs-echo.example=%s.21
+ """ % (os.environ['PREFIX'])
+
class IncomingECSByNameTest(ECSTest):
_confdir = 'IncomingECSByName'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected, ttlECS)
+class IncomingECSByNameHardenedTest(IncomingECSByNameTest):
+ _confdir = 'IncomingECSByNameHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=ecs-echo.example.
+use-incoming-edns-subnet=yes
+forward-zones=ecs-echo.example=%s.21
+ecs-scope-zero-address=2001:db8::42
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
+ """ % (os.environ['PREFIX'])
+
class IncomingECSByNameLargerTest(ECSTest):
_confdir = 'IncomingECSByNameLarger'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected, ttlECS)
+class IncomingECSByNameLargerHardenedTest(IncomingECSByNameLargerTest):
+ _confdir = 'IncomingECSByNameLargerHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=ecs-echo.example.
+use-incoming-edns-subnet=yes
+ecs-ipv4-bits=32
+forward-zones=ecs-echo.example=%s.21
+ecs-scope-zero-address=192.168.0.1
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
+ """ % (os.environ['PREFIX'])
+
class IncomingECSByNameSmallerTest(ECSTest):
_confdir = 'IncomingECSByNameSmaller'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected, ttlECS)
+class IncomingECSByNameSmallerHardenedTest(IncomingECSByNameSmallerTest):
+ _confdir = 'IncomingECSByNameSmallerHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=ecs-echo.example.
+use-incoming-edns-subnet=yes
+ecs-ipv4-bits=16
+forward-zones=ecs-echo.example=%s.21
+ecs-scope-zero-address=192.168.0.1
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
+ """ % (os.environ['PREFIX'])
+
@unittest.skipIf(not have_ipv6(), "No IPv6")
class IncomingECSByNameV6Test(ECSTest):
_confdir = 'IncomingECSByNameV6'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected)
+@unittest.skipIf(not have_ipv6(), "No IPv6")
+class IncomingECSByNameV6HardenedTest(IncomingECSByNameV6Test):
+ _confdir = 'IncomingECSByNameV6Hardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=ecs-echo.example.
+use-incoming-edns-subnet=yes
+ecs-ipv6-bits=128
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
+query-local-address=::1
+forward-zones=ecs-echo.example=[::1]:53000
+ """
+
class ECSByIPTest(ECSTest):
_confdir = 'ECSByIP'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected)
+class ECSByIPHardenedTest(ECSByIPTest):
+ _confdir = 'ECSByIPHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=%s.21
+forward-zones=ecs-echo.example=%s.21
+ """ % (os.environ['PREFIX'], os.environ['PREFIX'])
+
class IncomingECSByIPTest(ECSTest):
_confdir = 'IncomingECSByIP'
self.sendECSQuery(query, expected)
+class IncomingECSByIPHardenedTest(IncomingECSByIPTest):
+ _confdir = 'IncomingECSByIPHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=%s.21
+use-incoming-edns-subnet=yes
+forward-zones=ecs-echo.example=%s.21
+ecs-scope-zero-address=::1
+ecs-ipv4-cache-bits=32
+ecs-ipv6-cache-bits=128
+ """ % (os.environ['PREFIX'], os.environ['PREFIX'])
+
class ECSIPMismatchTest(ECSTest):
_confdir = 'ECSIPMismatch'
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected)
+class ECSIPMismatchHardenedTest(ECSIPMismatchTest):
+ _confdir = 'ECSIPMismatchHardened'
+
+ _config_template = """
+edns-subnet-harden=yes
+edns-subnet-allow-list=192.0.2.1
+forward-zones=ecs-echo.example=%s.21
+ """ % (os.environ['PREFIX'])
+
class ECSWithProxyProtocolRecursorTest(ECSTest):
_confdir = 'ECSWithProxyProtocolRecursor'
_config_template = """
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertRRsetInAnswer(res, expected)
+class ECSWithProxyProtocolRecursorHardenedTest(ECSWithProxyProtocolRecursorTest):
+ _confdir = 'ECSWithProxyProtocolRecursorHardened'
+ _config_template = """
+ edns-subnet-harden=yes
+ ecs-add-for=2001:db8::1/128
+ edns-subnet-allow-list=ecs-echo.example.
+ forward-zones=ecs-echo.example=%s.21
+ proxy-protocol-from=127.0.0.1/32
+ allow-from=2001:db8::1/128
+""" % (os.environ['PREFIX'])
+
class TooLargeToAddZeroScopeTest(RecursorTest):
_confdir = 'TooLargeToAddZeroScope'
additional = dns.rrset.from_text('ns1.ecs-echo.example.', 15, dns.rdataclass.IN, 'A', os.environ['PREFIX'] + '.21')
response.additional.append(additional)
+ elif request.question[0].name == dns.name.from_text('x' + nameECS):
+ answer = dns.rrset.from_text(request.question[0].name, ttlECS, dns.rdataclass.IN, 'TXT', 'X')
+ response.answer.append(answer)
+
if ecso:
response.use_edns(options = [ecso])