]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.recursor-dnssec/test_Protobuf.py
Make sure we can install unsigned packages.
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_Protobuf.py
index 9a5d72fc4c5032b6ef5f9f376d93125f6f3fb206..689f5478db1966aa9c7af68acfc55c2620d0c234 100644 (file)
@@ -202,6 +202,9 @@ class TestRecursorProtobuf(RecursorTest):
         self.assertEquals(msg.response.appliedPolicyType, policyType)
 
     def checkProtobufTags(self, msg, tags):
+        print(tags)
+        print('---')
+        print(msg.response.tags)
         self.assertEquals(len(msg.response.tags), len(tags))
         for tag in msg.response.tags:
             self.assertTrue(tag in tags)
@@ -550,7 +553,7 @@ auth-zones=example=configs/%s/example.zone""" % _confdir
         # check the protobuf messages corresponding to the UDP query and answer
         msg = self.getFirstProtobufMessage()
         self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
-        self.checkProtobufTags(msg, [self._tag_from_gettag])
+        self.checkProtobufTags(msg, [ self._tag_from_gettag ])
         # then the response
         msg = self.getFirstProtobufMessage()
         self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
@@ -559,7 +562,7 @@ auth-zones=example=configs/%s/example.zone""" % _confdir
         # we have max-cache-ttl set to 15
         self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
         self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.84')
-        tags = [self._tag_from_gettag] + self._tags
+        tags = [ self._tag_from_gettag ] + self._tags
         self.checkProtobufTags(msg, tags)
         self.checkNoRemainingMessage()
 
@@ -809,3 +812,125 @@ auth-zones=example=configs/%s/example.zone""" % _confdir
       return 0
     end
     """ % (ProtobufTaggedExtraFieldsTest._requestorId, ProtobufTaggedExtraFieldsTest._deviceId, ProtobufTaggedExtraFieldsTest._deviceName)
+
+class ProtobufRPZTest(TestRecursorProtobuf):
+    """
+    This test makes sure that we correctly export the RPZ applied policy in our protobuf messages
+    """
+
+    _confdir = 'ProtobufRPZ'
+    _config_template = """
+auth-zones=example=configs/%s/example.rpz.zone""" % _confdir
+    _lua_config_file = """
+    protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=true, logResponses=true } )
+    rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz."})
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port, _confdir)
+
+    @classmethod
+    def generateRecursorConfig(cls, confdir):
+        authzonepath = os.path.join(confdir, 'example.rpz.zone')
+        with open(authzonepath, 'w') as authzone:
+            authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+sub.test 3600 IN A 192.0.2.42
+""".format(soa=cls._SOA))
+
+        rpzFilePath = os.path.join(confdir, 'zone.rpz')
+        with open(rpzFilePath, 'w') as rpzZone:
+            rpzZone.write("""$ORIGIN zone.rpz.
+@ 3600 IN SOA {soa}
+*.test.example.zone.rpz. 60 IN CNAME rpz-passthru.
+""".format(soa=cls._SOA))
+
+        super(ProtobufRPZTest, cls).generateRecursorConfig(confdir)
+
+    def testA(self):
+        name = 'sub.test.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf messages corresponding to the UDP query and answer
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+
+        # then the response
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
+        self.checkProtobufPolicy(msg, dnsmessage_pb2.PBDNSMessage.PolicyType.QNAME, 'zone.rpz.')
+        self.assertEquals(len(msg.response.rrs), 1)
+        rr = msg.response.rrs[0]
+        # we have max-cache-ttl set to 15
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+        self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
+        self.checkNoRemainingMessage()
+
+class ProtobufRPZTagsTest(TestRecursorProtobuf):
+    """
+    This test makes sure that we correctly export the RPZ tags in our protobuf messages
+    """
+
+    _confdir = 'ProtobufRPZTags'
+    _config_template = """
+auth-zones=example=configs/%s/example.rpz.zone""" % _confdir
+    _tags = ['tag1', 'tag2']
+    _tags_from_gettag = ['tag1-from-gettag', 'tag2-from-gettag']
+    _tags_from_rpz = ['tag1-from-rpz', 'tag2-from-rpz' ]
+    _lua_config_file = """
+    protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=true, logResponses=true, tags={'tag1', 'tag2'} } )
+    rpzFile('configs/%s/zone.rpz', { policyName="zone.rpz.", tags={ '%s', '%s'} })
+    """ % (protobufServersParameters[0].port, protobufServersParameters[1].port, _confdir, _tags_from_rpz[0], _tags_from_rpz[1])
+    _lua_dns_script_file = """
+    function gettag(remote, ednssubnet, localip, qname, qtype, ednsoptions, tcp)
+      return 0, { '%s', '%s' }
+    end
+    function preresolve(dq)
+      dq:addPolicyTag('%s')
+      dq:addPolicyTag('%s')
+      return false
+    end
+    """ % (_tags_from_gettag[0], _tags_from_gettag[1], _tags[0], _tags[1])
+
+    @classmethod
+    def generateRecursorConfig(cls, confdir):
+        authzonepath = os.path.join(confdir, 'example.rpz.zone')
+        with open(authzonepath, 'w') as authzone:
+            authzone.write("""$ORIGIN example.
+@ 3600 IN SOA {soa}
+sub.test 3600 IN A 192.0.2.42
+""".format(soa=cls._SOA))
+
+        rpzFilePath = os.path.join(confdir, 'zone.rpz')
+        with open(rpzFilePath, 'w') as rpzZone:
+            rpzZone.write("""$ORIGIN zone.rpz.
+@ 3600 IN SOA {soa}
+*.test.example.zone.rpz. 60 IN CNAME rpz-passthru.
+""".format(soa=cls._SOA))
+
+        super(ProtobufRPZTagsTest, cls).generateRecursorConfig(confdir)
+
+    def testA(self):
+        name = 'sub.test.example.'
+        expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
+        query = dns.message.make_query(name, 'A', want_dnssec=True)
+        query.flags |= dns.flags.CD
+        res = self.sendUDPQuery(query)
+        self.assertRRsetInAnswer(res, expected)
+
+        # check the protobuf messages corresponding to the UDP query and answer
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
+
+        # then the response
+        msg = self.getFirstProtobufMessage()
+        self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
+        self.checkProtobufPolicy(msg, dnsmessage_pb2.PBDNSMessage.PolicyType.QNAME, 'zone.rpz.')
+        self.checkProtobufTags(msg, self._tags + self._tags_from_gettag + self._tags_from_rpz)
+        self.assertEquals(len(msg.response.rrs), 1)
+        rr = msg.response.rrs[0]
+        # we have max-cache-ttl set to 15
+        self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
+        self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
+        self.checkNoRemainingMessage()