]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
feat(dnsdist): Add IPCrypt2 PFX to tests
authorPieter Lexis <pieter.lexis@powerdns.com>
Mon, 15 Sep 2025 10:39:09 +0000 (12:39 +0200)
committerPieter Lexis <pieter.lexis@powerdns.com>
Thu, 2 Oct 2025 12:07:03 +0000 (14:07 +0200)
Signed-off-by: Pieter Lexis <pieter.lexis@powerdns.com>
regression-tests.dnsdist/test_Protobuf.py

index eb51483961b3cd294a7dc2a34613bc38d12b9f77..244b5c4d7ff286181a142daee7d101ce96e08b6e 100644 (file)
@@ -948,6 +948,98 @@ class TestProtobufIPCipher(DNSDistProtobufTest):
         self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600)
         self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1')
 
+class TestProtobufIPCrypt2PFX(DNSDistProtobufTest):
+    _config_params = ['_testServerPort', '_protobufServerPort', '_protobufServerID', '_protobufServerID']
+    _config_template = """
+    newServer{address="127.0.0.1:%d", useClientSubnet=true}
+    rl = newRemoteLogger('127.0.0.1:%d')
+    --- 32 byte key
+    key = "12345678901234567890123456789012"
+    addAction(AllRule(), RemoteLogAction(rl, nil, {serverID='%s', ipEncryptKey=key, ipEncryptMethod='ipcrypt-pfx'})) -- Send protobuf message before lookup
+    addResponseAction(AllRule(), RemoteLogResponseAction(rl, nil, true, {serverID='%s', ipEncryptKey=key, ipEncryptMethod='ipcrypt-pfx'})) -- Send protobuf message after lookup
+    """
+
+    def testProtobuf(self):
+        """
+        Protobuf: Send data to a protobuf server, with pseudonymization
+        """
+        name = 'query.protobuf-ipcipher.tests.powerdns.com.'
+
+        target = 'target.protobuf-ipcipher.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        response = dns.message.make_response(query)
+
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.CNAME,
+                                    target)
+        response.answer.append(rrset)
+
+        rrset = dns.rrset.from_text(target,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEqual(query, receivedQuery)
+        self.assertEqual(response, receivedResponse)
+
+        if self._protobufQueue.empty():
+            # let the protobuf messages the time to get there
+            time.sleep(1)
+
+        # check the protobuf message corresponding to the UDP query
+        msg = self.getFirstProtobufMessage()
+
+        # 108.41.239.98 is 127.0.0.1 pseudonymized with ipcrypt2-pfx and the current key
+        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name, '109.33.15.148')
+
+        # check the protobuf message corresponding to the UDP response
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, response, '109.33.15.148')
+
+        self.assertEqual(len(msg.response.rrs), 2)
+        rr = msg.response.rrs[0]
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.CNAME, name, 3600)
+        self.assertEqual(rr.rdata.decode('ascii'), target)
+        rr = msg.response.rrs[1]
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600)
+        self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1')
+
+        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = query.id
+        self.assertEqual(query, receivedQuery)
+        self.assertEqual(response, receivedResponse)
+
+        if self._protobufQueue.empty():
+            # let the protobuf messages the time to get there
+            time.sleep(1)
+
+        # check the protobuf message corresponding to the TCP query
+        msg = self.getFirstProtobufMessage()
+        # 108.41.239.98 is 127.0.0.1 pseudonymized with ipcrypt2-pfx and the current key
+        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.TCP, query, dns.rdataclass.IN, dns.rdatatype.A, name, '109.33.15.148')
+
+        # check the protobuf message corresponding to the TCP response
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.TCP, response, '109.33.15.148')
+        self.assertEqual(len(msg.response.rrs), 2)
+        rr = msg.response.rrs[0]
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.CNAME, name, 3600)
+        self.assertEqual(rr.rdata.decode('ascii'), target)
+        rr = msg.response.rrs[1]
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600)
+        self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1')
+
+
 class TestProtobufQUIC(DNSDistProtobufTest):
 
     _serverKey = 'server.key'