From: Peter van Dijk Date: Tue, 7 Feb 2023 14:26:20 +0000 (+0100) Subject: auth-py tests: test ECS in ALIAS forwarding X-Git-Tag: auth-4.9.0-alpha1^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=93e579321f8cb8dc2c6dca688194c4fadb2105da;p=thirdparty%2Fpdns.git auth-py tests: test ECS in ALIAS forwarding --- diff --git a/regression-tests.auth-py/test_ALIAS.py b/regression-tests.auth-py/test_ALIAS.py index a9ff3ae1bd..dda27331bd 100644 --- a/regression-tests.auth-py/test_ALIAS.py +++ b/regression-tests.auth-py/test_ALIAS.py @@ -4,6 +4,7 @@ from __future__ import print_function import threading import unittest +import clientsubnetoption import dns from twisted.internet.protocol import DatagramProtocol @@ -20,6 +21,7 @@ expand-alias=yes resolver=%s.1:5301 any-to-tcp=no launch=bind +edns-subnet-processing=yes """ _config_params = ['_PREFIX'] @@ -34,7 +36,8 @@ ns2.example.org. 3600 IN A {prefix}.11 noerror.example.org. 3600 IN ALIAS noerror.example.com. nxd.example.org. 3600 IN ALIAS nxd.example.com. -servfail.example.org. 3600 IN ALIAS servfail.example.com +servfail.example.org. 3600 IN ALIAS servfail.example.com. +subnet.example.org. 3600 IN ALIAS subnet.example.com. """, } @@ -171,6 +174,30 @@ servfail.example.org. 3600 IN ALIAS servfail.example.com res = self.sendTCPQuery(query) self.assertRcodeEqual(res, dns.rcode.SERVFAIL) + def testECS(self): + expected_a = [dns.rrset.from_text('subnet.example.org.', + 0, dns.rdataclass.IN, 'A', + '192.0.2.1')] + expected_aaaa = [dns.rrset.from_text('subnet.example.org.', + 0, dns.rdataclass.IN, 'AAAA', + '2001:DB8::1')] + + ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24) + ecso2 = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24, 22) + query = dns.message.make_query('subnet.example.org', 'A', use_edns=True, options=[ecso]) + res = self.sendUDPQuery(query) + self.assertRcodeEqual(res, dns.rcode.NOERROR) + self.assertAnyRRsetInAnswer(res, expected_a) + self.assertEqual(res.options[0], ecso2) + + ecso = clientsubnetoption.ClientSubnetOption('2001:db8:db6:db5::', 64) + ecso2 = clientsubnetoption.ClientSubnetOption('2001:db8:db6::', 64, 48) + query = dns.message.make_query('subnet.example.org', 'A', use_edns=True, options=[ecso]) + res = self.sendUDPQuery(query) + self.assertRcodeEqual(res, dns.rcode.NOERROR) + self.assertAnyRRsetInAnswer(res, expected_a) + self.assertEqual(res.options[0], ecso2) + class AliasUDPResponder(DatagramProtocol): def datagramReceived(self, datagram, address): @@ -183,7 +210,12 @@ class AliasUDPResponder(DatagramProtocol): name = question.name name_text = name.to_text() - if name_text == 'noerror.example.com.': + if name_text in ('noerror.example.com.', 'subnet.example.com.'): + + do_ecs = False + if name_text == 'subnet.example.com.': + do_ecs=True + response.set_rcode(dns.rcode.NOERROR) if question.rdtype in [dns.rdatatype.A, dns.rdatatype.ANY]: @@ -198,6 +230,14 @@ class AliasUDPResponder(DatagramProtocol): dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'AAAA', '2001:DB8::1')) + + if do_ecs: + if request.options[0].family == clientsubnetoption.FAMILY_IPV4: + ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24, 22) + else: + ecso = clientsubnetoption.ClientSubnetOption('2001:db8:db6::', 64, 48) + response.use_edns(edns=True, options=[ecso]) + if name_text == 'nxd.example.com.': response.set_rcode(dns.rcode.NXDOMAIN) response.authority.append(