self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1')
+class TestProtobufIPCrypt2PFX(DNSDistProtobufTest):
+ _config_params = ['_testServerPort', '_protobufServerPort', '_protobufServerID', '_protobufServerID']
+ _config_template = """
+ newServer{address="127.0.0.1:%d", useClientSubnet=true}
+ rl = newRemoteLogger('127.0.0.1:%d')
+ --- 32 byte key
+ key = "12345678901234567890123456789012"
+ addAction(AllRule(), RemoteLogAction(rl, nil, {serverID='%s', ipEncryptKey=key, ipEncryptMethod='ipcrypt-pfx'})) -- Send protobuf message before lookup
+ addResponseAction(AllRule(), RemoteLogResponseAction(rl, nil, true, {serverID='%s', ipEncryptKey=key, ipEncryptMethod='ipcrypt-pfx'})) -- Send protobuf message after lookup
+ """
+
+ def testProtobuf(self):
+ """
+ Protobuf: Send data to a protobuf server, with pseudonymization
+ """
+ name = 'query.protobuf-ipcipher.tests.powerdns.com.'
+
+ target = 'target.protobuf-ipcipher.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ target)
+ response.answer.append(rrset)
+
+ rrset = dns.rrset.from_text(target,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)
+
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
+
+ # check the protobuf message corresponding to the UDP query
+ msg = self.getFirstProtobufMessage()
+
+ # 108.41.239.98 is 127.0.0.1 pseudonymized with ipcrypt2-pfx and the current key
+ self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name, '109.33.15.148')
+
+ # check the protobuf message corresponding to the UDP response
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, response, '109.33.15.148')
+
+ self.assertEqual(len(msg.response.rrs), 2)
+ rr = msg.response.rrs[0]
+ self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.CNAME, name, 3600)
+ self.assertEqual(rr.rdata.decode('ascii'), target)
+ rr = msg.response.rrs[1]
+ self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600)
+ self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1')
+
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ self.assertEqual(response, receivedResponse)
+
+ if self._protobufQueue.empty():
+ # let the protobuf messages the time to get there
+ time.sleep(1)
+
+ # check the protobuf message corresponding to the TCP query
+ msg = self.getFirstProtobufMessage()
+ # 108.41.239.98 is 127.0.0.1 pseudonymized with ipcrypt2-pfx and the current key
+ self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.TCP, query, dns.rdataclass.IN, dns.rdatatype.A, name, '109.33.15.148')
+
+ # check the protobuf message corresponding to the TCP response
+ msg = self.getFirstProtobufMessage()
+ self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.TCP, response, '109.33.15.148')
+ self.assertEqual(len(msg.response.rrs), 2)
+ rr = msg.response.rrs[0]
+ self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.CNAME, name, 3600)
+ self.assertEqual(rr.rdata.decode('ascii'), target)
+ rr = msg.response.rrs[1]
+ self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, target, 3600)
+ self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '127.0.0.1')
+
+
class TestProtobufQUIC(DNSDistProtobufTest):
_serverKey = 'server.key'