]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
also test proxy+ecs
authorPeter van Dijk <peter.van.dijk@powerdns.com>
Thu, 16 Sep 2021 10:04:45 +0000 (12:04 +0200)
committerPeter van Dijk <peter.van.dijk@powerdns.com>
Thu, 16 Sep 2021 10:04:45 +0000 (12:04 +0200)
regression-tests.auth-py/test_ProxyProtocol.py

index c3861cee25028b8674e4cae8ff2e99bbf2ddb54b..2d2b7ad575e37e3bc9883eb97126819a4ad61af3 100644 (file)
@@ -1,3 +1,4 @@
+import clientsubnetoption
 import dns
 import os
 import socket
@@ -14,6 +15,7 @@ class TestProxyProtocolLuaRecords(AuthTest):
 launch=bind
 any-to-tcp=no
 proxy-protocol-from=127.0.0.1
+edns-subnet-processing=yes
 """
 
     _zones = {
@@ -24,7 +26,7 @@ example.org.                 3600 IN NS   ns2.example.org.
 ns1.example.org.             3600 IN A    {prefix}.10
 ns2.example.org.             3600 IN A    {prefix}.11
 
-myip.example.org.            3600 IN LUA  A     "who:toString()"
+myip.example.org.            3600 IN LUA  TXT     "who:toString()..'/'..bestwho:toString()"
         """
     }
 
@@ -37,66 +39,76 @@ myip.example.org.            3600 IN LUA  A     "who:toString()"
         See if LUA who picks up the inner address from the PROXY protocol
         """
         
-        # first test with an unproxied query - should get ignored
-        query = dns.message.make_query('myip.example.org', 'A')
+        for testWithECS in True, False:
+            # first test with an unproxied query - should get ignored
 
-        res = self.sendUDPQuery(query)
+            options = []
+            expectedText = '192.0.2.1/192.0.2.1'
 
-        self.assertEqual(res, None)     # query was ignored correctly
+            if testWithECS:
+                ecso = clientsubnetoption.ClientSubnetOption('192.0.2.5', 32)
+                options.append(ecso)
+                expectedText = '192.0.2.1/192.0.2.5'
 
+            query = dns.message.make_query('myip.example.org', 'TXT', 'IN', use_edns=testWithECS, options=options, payload=512)
 
-        # now send a proxied query
-        queryPayload = query.to_wire()
-        ppPayload = ProxyProtocol.getPayload(False, False, False, "192.0.2.1", "10.1.2.3", 12345, 53, [])
-        payload = ppPayload + queryPayload
+            res = self.sendUDPQuery(query)
 
-        # UDP
-        self._sock.settimeout(2.0)
+            self.assertEqual(res, None)     # query was ignored correctly
 
-        try:
-            self._sock.send(payload)
-            data = self._sock.recv(4096)
-        except socket.timeout:
-            data = None
-        finally:
-            self._sock.settimeout(None)
-
-        res = None
-        if data:
-            res = dns.message.from_wire(data)
-
-        expected = [dns.rrset.from_text('myip.example.org.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')]
-        self.assertRcodeEqual(res, dns.rcode.NOERROR)
-        self.assertEqual(res.answer, expected)
-
-        # TCP
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        sock.settimeout(2.0)
-        sock.connect(("127.0.0.1", self._authPort))
 
-        try:
-            sock.send(ppPayload)
-            sock.send(struct.pack("!H", len(queryPayload)))
-            sock.send(queryPayload)
-            data = sock.recv(2)
+            # now send a proxied query
+            queryPayload = query.to_wire()
+            ppPayload = ProxyProtocol.getPayload(False, False, False, "192.0.2.1", "10.1.2.3", 12345, 53, [])
+            payload = ppPayload + queryPayload
+
+            # UDP
+            self._sock.settimeout(2.0)
+
+            try:
+                self._sock.send(payload)
+                data = self._sock.recv(4096)
+            except socket.timeout:
+                data = None
+            finally:
+                self._sock.settimeout(None)
+
+            res = None
             if data:
-                (datalen,) = struct.unpack("!H", data)
-                data = sock.recv(datalen)
-        except socket.timeout as e:
-            print("Timeout: %s" % (str(e)))
-            data = None
-        except socket.error as e:
-            print("Network error: %s" % (str(e)))
-            data = None
-        finally:
-            sock.close()
-
-        res = None
-        if data:
-            res = dns.message.from_wire(data)
-
-        self.assertRcodeEqual(res, dns.rcode.NOERROR)
-        self.assertEqual(res.answer, expected)
+                res = dns.message.from_wire(data)
+
+            expected = [dns.rrset.from_text('myip.example.org.', 0, dns.rdataclass.IN, 'TXT', expectedText)]
+            self.assertRcodeEqual(res, dns.rcode.NOERROR)
+            self.assertEqual(res.answer, expected)
+
+            # TCP
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock.settimeout(2.0)
+            sock.connect(("127.0.0.1", self._authPort))
+
+            try:
+                sock.send(ppPayload)
+                sock.send(struct.pack("!H", len(queryPayload)))
+                sock.send(queryPayload)
+                data = sock.recv(2)
+                if data:
+                    (datalen,) = struct.unpack("!H", data)
+                    data = sock.recv(datalen)
+            except socket.timeout as e:
+                print("Timeout: %s" % (str(e)))
+                data = None
+            except socket.error as e:
+                print("Network error: %s" % (str(e)))
+                data = None
+            finally:
+                sock.close()
+
+            res = None
+            if data:
+                res = dns.message.from_wire(data)
+
+            self.assertRcodeEqual(res, dns.rcode.NOERROR)
+            self.assertEqual(res.answer, expected)
 
 class TestProxyProtocolNOTIFY(AuthTest):
     _config_template = """