]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_LuaFFI.py
dnsdist: Add regression tests for DNS header set/get via Lua FFI
[thirdparty/pdns.git] / regression-tests.dnsdist / test_LuaFFI.py
index 22d2deafb8ac30c39633a49116804cc790c1effc..0b8c42a300ab0c606c7a3037013e5b4475188afd 100644 (file)
@@ -83,6 +83,16 @@ class TestAdvancedLuaFFI(DNSDistTest):
         print(ffi.string(tag))
         return false
       end
+
+      local raw_tag_buf_size = 255
+      local raw_tag_buf = ffi.new("char [?]", raw_tag_buf_size)
+      local raw_tag_size = ffi.C.dnsdist_ffi_dnsquestion_get_tag_raw(dq, 'raw-tag', raw_tag_buf, raw_tag_buf_size)
+      if ffi.string(raw_tag_buf, raw_tag_size) ~= 'a\0b' then
+        print('invalid raw tag value')
+        print(ffi.string(raw_tag_buf,  raw_tag_size))
+        return false
+      end
+
       return true
     end
 
@@ -105,9 +115,16 @@ class TestAdvancedLuaFFI(DNSDistTest):
       return DNSAction.None
     end
 
+    function luaffiactionsettagraw(dq)
+      local value = "a\0b"
+      ffi.C.dnsdist_ffi_dnsquestion_set_tag_raw(dq, 'raw-tag', value, #value)
+      return DNSAction.None
+    end
+
     addAction(AllRule(), LuaFFIAction(luaffiactionsettag))
+    addAction(AllRule(), LuaFFIAction(luaffiactionsettagraw))
     addAction(LuaFFIRule(luaffirulefunction), LuaFFIAction(luaffiactionfunction))
-    -- newServer{address="127.0.0.1:%s"}
+    -- newServer{address="127.0.0.1:%d"}
     """
 
     def testAdvancedLuaFFI(self):
@@ -256,7 +273,7 @@ class TestAdvancedLuaFFIPerThread(DNSDistTest):
 
     addAction(AllRule(), LuaFFIPerThreadAction(settagfunction))
     addAction(LuaFFIPerThreadRule(rulefunction), LuaFFIPerThreadAction(actionfunction))
-    -- newServer{address="127.0.0.1:%s"}
+    -- newServer{address="127.0.0.1:%d"}
     """
 
     def testAdvancedLuaPerthreadFFI(self):
@@ -298,3 +315,85 @@ class TestAdvancedLuaFFIPerThread(DNSDistTest):
             sender = getattr(self, method)
             (_, receivedResponse) = sender(query, response=None, useQueue=False)
             self.assertEqual(receivedResponse, response)
+
+class TestLuaFFIHeader(DNSDistTest):
+
+    _config_template = """
+    local bit = require("bit")
+    local ffi = require("ffi")
+
+    function setAAResponseAction(dr)
+      local header_void = ffi.C.dnsdist_ffi_dnsquestion_get_header(dr)
+      local header = ffi.cast("unsigned char *", header_void)
+      -- get AA
+      local aa = bit.band(header[2], bit.lshift(1, 2)) ~= 0
+      if aa then
+          ffi.C.dnsdist_ffi_dnsquestion_set_rcode(dr, DNSRCode.REFUSED)
+          return DNSResponseAction.HeaderModify
+      end
+      -- set AA=1
+      header[2] = bit.bor(header[2], bit.lshift(1, 2))
+      return DNSResponseAction.None
+    end
+
+    addResponseAction(AllRule(), LuaFFIResponseAction(setAAResponseAction))
+    newServer{address="127.0.0.1:%d"}
+    """
+    _verboseMode = True
+
+    def testLuaFFISetAAHeader(self):
+        """
+        Lua FFI: Set AA=1
+        """
+        name = 'dnsheader-set-aa.luaffi.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.1')
+        response.answer.append(rrset)
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.1')
+        expectedResponse.answer.append(rrset)
+        expectedResponse.flags |= dns.flags.AA
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            receivedQuery.id = query.id
+            self.assertEqual(query, receivedQuery)
+            self.assertEqual(expectedResponse, receivedResponse)
+
+    def testLuaFFIgetAAHeader(self):
+        """
+        Lua FFI: check AA=0
+        """
+        name = 'dnsheader-get-aa.luaffi.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    60,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '192.0.2.1')
+        response.answer.append(rrset)
+        response.flags |= dns.flags.AA
+        expectedResponse = dns.message.make_response(query)
+        expectedResponse.flags |= dns.flags.AA
+        expectedResponse.set_rcode(dns.rcode.REFUSED)
+        expectedResponse.answer.append(rrset)
+
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            receivedQuery.id = query.id
+            self.assertEqual(query, receivedQuery)
+            self.assertEqual(expectedResponse, receivedResponse)