From: Pieter Lexis Date: Mon, 15 Sep 2025 10:39:09 +0000 (+0200) Subject: feat(dnsdist): Add IPCrypt2 PFX to tests X-Git-Tag: rec-5.4.0-alpha1~208^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d5508a21a49d92aa11a7bfe448805cce350fb0a9;p=thirdparty%2Fpdns.git feat(dnsdist): Add IPCrypt2 PFX to tests Signed-off-by: Pieter Lexis --- diff --git a/regression-tests.dnsdist/test_Protobuf.py b/regression-tests.dnsdist/test_Protobuf.py index eb51483961..244b5c4d7f 100644 --- a/regression-tests.dnsdist/test_Protobuf.py +++ b/regression-tests.dnsdist/test_Protobuf.py @@ -948,6 +948,98 @@ class TestProtobufIPCipher(DNSDistProtobufTest): self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600) self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1') +class TestProtobufIPCrypt2PFX(DNSDistProtobufTest): + _config_params = ['_testServerPort', '_protobufServerPort', '_protobufServerID', '_protobufServerID'] + _config_template = """ + newServer{address="127.0.0.1:%d", useClientSubnet=true} + rl = newRemoteLogger('127.0.0.1:%d') + --- 32 byte key + key = "12345678901234567890123456789012" + addAction(AllRule(), RemoteLogAction(rl, nil, {serverID='%s', ipEncryptKey=key, ipEncryptMethod='ipcrypt-pfx'})) -- Send protobuf message before lookup + addResponseAction(AllRule(), RemoteLogResponseAction(rl, nil, true, {serverID='%s', ipEncryptKey=key, ipEncryptMethod='ipcrypt-pfx'})) -- Send protobuf message after lookup + """ + + def testProtobuf(self): + """ + Protobuf: Send data to a protobuf server, with pseudonymization + """ + name = 'query.protobuf-ipcipher.tests.powerdns.com.' + + target = 'target.protobuf-ipcipher.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + + rrset = dns.rrset.from_text(name, + 3600, + dns.rdataclass.IN, + dns.rdatatype.CNAME, + target) + response.answer.append(rrset) + + rrset = dns.rrset.from_text(target, + 3600, + dns.rdataclass.IN, + dns.rdatatype.A, + '127.0.0.1') + response.answer.append(rrset) + + (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEqual(query, receivedQuery) + self.assertEqual(response, receivedResponse) + + if self._protobufQueue.empty(): + # let the protobuf messages the time to get there + time.sleep(1) + + # check the protobuf message corresponding to the UDP query + msg = self.getFirstProtobufMessage() + + # 108.41.239.98 is 127.0.0.1 pseudonymized with ipcrypt2-pfx and the current key + self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name, '109.33.15.148') + + # check the protobuf message corresponding to the UDP response + msg = self.getFirstProtobufMessage() + self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, response, '109.33.15.148') + + self.assertEqual(len(msg.response.rrs), 2) + rr = msg.response.rrs[0] + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.CNAME, name, 3600) + self.assertEqual(rr.rdata.decode('ascii'), target) + rr = msg.response.rrs[1] + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600) + self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1') + + (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEqual(query, receivedQuery) + self.assertEqual(response, receivedResponse) + + if self._protobufQueue.empty(): + # let the protobuf messages the time to get there + time.sleep(1) + + # check the protobuf message corresponding to the TCP query + msg = self.getFirstProtobufMessage() + # 108.41.239.98 is 127.0.0.1 pseudonymized with ipcrypt2-pfx and the current key + self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.TCP, query, dns.rdataclass.IN, dns.rdatatype.A, name, '109.33.15.148') + + # check the protobuf message corresponding to the TCP response + msg = self.getFirstProtobufMessage() + self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.TCP, response, '109.33.15.148') + self.assertEqual(len(msg.response.rrs), 2) + rr = msg.response.rrs[0] + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.CNAME, name, 3600) + self.assertEqual(rr.rdata.decode('ascii'), target) + rr = msg.response.rrs[1] + self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600) + self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1') + + class TestProtobufQUIC(DNSDistProtobufTest): _serverKey = 'server.key'