]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
auth-py tests: test ECS in ALIAS forwarding
authorPeter van Dijk <peter.van.dijk@powerdns.com>
Tue, 7 Feb 2023 14:26:20 +0000 (15:26 +0100)
committerPeter van Dijk <peter.van.dijk@powerdns.com>
Thu, 11 Jan 2024 10:57:41 +0000 (11:57 +0100)
regression-tests.auth-py/test_ALIAS.py

index a9ff3ae1bd12cf1f44a9ea2a42a0c777d8ae0694..dda27331bd5ad16153930d328d7ade94e22b5029 100644 (file)
@@ -4,6 +4,7 @@ from __future__ import print_function
 
 import threading
 import unittest
+import clientsubnetoption
 
 import dns
 from twisted.internet.protocol import DatagramProtocol
@@ -20,6 +21,7 @@ expand-alias=yes
 resolver=%s.1:5301
 any-to-tcp=no
 launch=bind
+edns-subnet-processing=yes
 """
 
     _config_params = ['_PREFIX']
@@ -34,7 +36,8 @@ ns2.example.org.             3600 IN A    {prefix}.11
 
 noerror.example.org.         3600 IN ALIAS noerror.example.com.
 nxd.example.org.             3600 IN ALIAS nxd.example.com.
-servfail.example.org.        3600 IN ALIAS servfail.example.com
+servfail.example.org.        3600 IN ALIAS servfail.example.com.
+subnet.example.org.          3600 IN ALIAS subnet.example.com.
         """,
     }
 
@@ -171,6 +174,30 @@ servfail.example.org.        3600 IN ALIAS servfail.example.com
         res = self.sendTCPQuery(query)
         self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
 
+    def testECS(self):
+        expected_a = [dns.rrset.from_text('subnet.example.org.',
+                                          0, dns.rdataclass.IN, 'A',
+                                          '192.0.2.1')]
+        expected_aaaa = [dns.rrset.from_text('subnet.example.org.',
+                                             0, dns.rdataclass.IN, 'AAAA',
+                                             '2001:DB8::1')]
+
+        ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
+        ecso2 = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24, 22)
+        query = dns.message.make_query('subnet.example.org', 'A', use_edns=True, options=[ecso])
+        res = self.sendUDPQuery(query)
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertAnyRRsetInAnswer(res, expected_a)
+        self.assertEqual(res.options[0], ecso2)
+
+        ecso = clientsubnetoption.ClientSubnetOption('2001:db8:db6:db5::', 64)
+        ecso2 = clientsubnetoption.ClientSubnetOption('2001:db8:db6::', 64, 48)
+        query = dns.message.make_query('subnet.example.org', 'A', use_edns=True, options=[ecso])
+        res = self.sendUDPQuery(query)
+        self.assertRcodeEqual(res, dns.rcode.NOERROR)
+        self.assertAnyRRsetInAnswer(res, expected_a)
+        self.assertEqual(res.options[0], ecso2)
+
 
 class AliasUDPResponder(DatagramProtocol):
     def datagramReceived(self, datagram, address):
@@ -183,7 +210,12 @@ class AliasUDPResponder(DatagramProtocol):
         name = question.name
         name_text = name.to_text()
 
-        if name_text == 'noerror.example.com.':
+        if name_text in ('noerror.example.com.', 'subnet.example.com.'):
+
+            do_ecs = False
+            if name_text == 'subnet.example.com.':
+                do_ecs=True
+
             response.set_rcode(dns.rcode.NOERROR)
             if question.rdtype in [dns.rdatatype.A,
                                               dns.rdatatype.ANY]:
@@ -198,6 +230,14 @@ class AliasUDPResponder(DatagramProtocol):
                     dns.rrset.from_text(name,
                                         0, dns.rdataclass.IN, 'AAAA',
                                         '2001:DB8::1'))
+
+            if do_ecs:
+                if request.options[0].family == clientsubnetoption.FAMILY_IPV4:
+                    ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24, 22)
+                else:
+                    ecso = clientsubnetoption.ClientSubnetOption('2001:db8:db6::', 64, 48)
+                response.use_edns(edns=True, options=[ecso])
+
         if name_text == 'nxd.example.com.':
             response.set_rcode(dns.rcode.NXDOMAIN)
             response.authority.append(