]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_LuaFFI.py
5 from dnsdisttests
import DNSDistTest
7 class TestAdvancedLuaFFI ( DNSDistTest
):
10 local ffi = require("ffi")
12 local expectingUDP = true
14 function luaffirulefunction(dq)
15 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
16 if qtype ~= DNSQType.A and qtype ~= DNSQType.SOA then
17 print('invalid qtype')
21 local qclass = ffi.C.dnsdist_ffi_dnsquestion_get_qclass(dq)
22 if qclass ~= DNSClass.IN then
23 print('invalid qclass')
27 local ret_ptr = ffi.new("char *[1]")
28 local ret_ptr_param = ffi.cast("const char **", ret_ptr)
29 local ret_size = ffi.new("size_t[1]")
30 local ret_size_param = ffi.cast("size_t*", ret_size)
31 ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, ret_ptr_param, ret_size_param)
32 if ret_size[0] ~= 36 then
33 print('invalid length for the qname ')
38 local expectedQname = string.char(6)..'luaffi'..string.char(8)..'advanced'..string.char(5)..'tests'..string.char(8)..'powerdns'..string.char(3)..'com'
39 if ffi.string(ret_ptr[0]) ~= expectedQname then
40 print('invalid qname')
41 print(ffi.string(ret_ptr[0]))
45 local rcode = ffi.C.dnsdist_ffi_dnsquestion_get_rcode(dq)
47 print('invalid rcode')
51 local opcode = ffi.C.dnsdist_ffi_dnsquestion_get_opcode(dq)
52 if qtype == DNSQType.A and opcode ~= DNSOpcode.Query then
53 print('invalid opcode')
55 elseif qtype == DNSQType.SOA and opcode ~= DNSOpcode.Update then
56 print('invalid opcode')
60 local tcp = ffi.C.dnsdist_ffi_dnsquestion_get_tcp(dq)
61 if expectingUDP == tcp then
65 expectingUDP = expectingUDP == false
67 local dnssecok = ffi.C.dnsdist_ffi_dnsquestion_get_do(dq)
68 if dnssecok ~= false then
69 print('invalid DNSSEC OK')
73 local len = ffi.C.dnsdist_ffi_dnsquestion_get_len(dq)
75 print('invalid length')
80 local tag = ffi.C.dnsdist_ffi_dnsquestion_get_tag(dq, 'a-tag')
81 if ffi.string(tag) ~= 'a-value' then
82 print('invalid tag value')
83 print(ffi.string(tag))
89 function luaffiactionfunction(dq)
90 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
91 if qtype == DNSQType.A then
92 local str = "192.0.2.1"
93 local buf = ffi.new("char[?]", #str + 1)
95 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
96 return DNSAction.Spoof
97 elseif qtype == DNSQType.SOA then
98 ffi.C.dnsdist_ffi_dnsquestion_set_rcode(dq, DNSRCode.REFUSED)
99 return DNSAction.Refused
103 function luaffiactionsettag(dq)
104 ffi.C.dnsdist_ffi_dnsquestion_set_tag(dq, 'a-tag', 'a-value')
105 return DNSAction.None
108 addAction(AllRule(), LuaFFIAction(luaffiactionsettag))
109 addAction(LuaFFIRule(luaffirulefunction), LuaFFIAction(luaffiactionfunction))
110 -- newServer{address="127.0.0.1: %s "}
113 def testAdvancedLuaFFI ( self
):
115 Lua FFI: Test the Lua FFI interface
117 name
= 'luaffi.advanced.tests.powerdns.com.'
118 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
119 # dnsdist set RA = RD for spoofed responses
120 query
. flags
&= ~dns
. flags
. RD
122 response
= dns
. message
. make_response ( query
)
123 rrset
= dns
. rrset
. from_text ( name
,
128 response
. answer
. append ( rrset
)
130 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
131 sender
= getattr ( self
, method
)
132 ( _
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
133 self
. assertEqual ( receivedResponse
, response
)
135 def testAdvancedLuaFFIUpdate ( self
):
137 Lua FFI: Test the Lua FFI interface via an update
139 name
= 'luaffi.advanced.tests.powerdns.com.'
140 query
= dns
. message
. make_query ( name
, 'SOA' , 'IN' )
141 query
. set_opcode ( dns
. opcode
. UPDATE
)
142 # dnsdist set RA = RD for spoofed responses
143 query
. flags
&= ~dns
. flags
. RD
145 response
= dns
. message
. make_response ( query
)
146 response
. set_rcode ( dns
. rcode
. REFUSED
)
148 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
149 sender
= getattr ( self
, method
)
150 ( _
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
151 self
. assertEqual ( receivedResponse
, response
)
153 class TestAdvancedLuaFFIPerThread ( DNSDistTest
):
155 _config_template
= """
157 local rulefunction = [[
158 local ffi = require("ffi")
161 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
162 if qtype ~= DNSQType.A and qtype ~= DNSQType.SOA then
163 print('invalid qtype')
167 local qclass = ffi.C.dnsdist_ffi_dnsquestion_get_qclass(dq)
168 if qclass ~= DNSClass.IN then
169 print('invalid qclass')
173 local ret_ptr = ffi.new("char *[1]")
174 local ret_ptr_param = ffi.cast("const char **", ret_ptr)
175 local ret_size = ffi.new("size_t[1]")
176 local ret_size_param = ffi.cast("size_t*", ret_size)
177 ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, ret_ptr_param, ret_size_param)
178 if ret_size[0] ~= 45 then
179 print('invalid length for the qname ')
184 local expectedQname = string.char(15)..'luaffiperthread'..string.char(8)..'advanced'..string.char(5)..'tests'..string.char(8)..'powerdns'..string.char(3)..'com'
185 if ffi.string(ret_ptr[0]) ~= expectedQname then
186 print('invalid qname')
187 print(ffi.string(ret_ptr[0]))
191 local rcode = ffi.C.dnsdist_ffi_dnsquestion_get_rcode(dq)
193 print('invalid rcode')
197 local opcode = ffi.C.dnsdist_ffi_dnsquestion_get_opcode(dq)
198 if qtype == DNSQType.A and opcode ~= DNSOpcode.Query then
199 print('invalid opcode')
201 elseif qtype == DNSQType.SOA and opcode ~= DNSOpcode.Update then
202 print('invalid opcode')
206 local dnssecok = ffi.C.dnsdist_ffi_dnsquestion_get_do(dq)
207 if dnssecok ~= false then
208 print('invalid DNSSEC OK')
212 local len = ffi.C.dnsdist_ffi_dnsquestion_get_len(dq)
214 print('invalid length')
219 local tag = ffi.C.dnsdist_ffi_dnsquestion_get_tag(dq, 'a-tag')
220 if ffi.string(tag) ~= 'a-value' then
221 print('invalid tag value')
222 print(ffi.string(tag))
230 local actionfunction = [[
231 local ffi = require("ffi")
234 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
235 if qtype == DNSQType.A then
236 local str = "192.0.2.1"
237 local buf = ffi.new("char[?]", #str + 1)
239 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
240 return DNSAction.Spoof
241 elseif qtype == DNSQType.SOA then
242 ffi.C.dnsdist_ffi_dnsquestion_set_rcode(dq, DNSRCode.REFUSED)
243 return DNSAction.Refused
248 local settagfunction = [[
249 local ffi = require("ffi")
252 ffi.C.dnsdist_ffi_dnsquestion_set_tag(dq, 'a-tag', 'a-value')
253 return DNSAction.None
257 addAction(AllRule(), LuaFFIPerThreadAction(settagfunction))
258 addAction(LuaFFIPerThreadRule(rulefunction), LuaFFIPerThreadAction(actionfunction))
259 -- newServer{address="127.0.0.1: %s "}
262 def testAdvancedLuaPerthreadFFI ( self
):
264 Lua FFI: Test the Lua FFI per-thread interface
266 name
= 'luaffiperthread.advanced.tests.powerdns.com.'
267 query
= dns
. message
. make_query ( name
, 'A' , 'IN' )
268 # dnsdist set RA = RD for spoofed responses
269 query
. flags
&= ~dns
. flags
. RD
271 response
= dns
. message
. make_response ( query
)
272 rrset
= dns
. rrset
. from_text ( name
,
277 response
. answer
. append ( rrset
)
279 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
280 sender
= getattr ( self
, method
)
281 ( _
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
282 self
. assertEqual ( receivedResponse
, response
)
284 def testAdvancedLuaFFIPerThreadUpdate ( self
):
286 Lua FFI: Test the Lua FFI per-thread interface via an update
288 name
= 'luaffiperthread.advanced.tests.powerdns.com.'
289 query
= dns
. message
. make_query ( name
, 'SOA' , 'IN' )
290 query
. set_opcode ( dns
. opcode
. UPDATE
)
291 # dnsdist set RA = RD for spoofed responses
292 query
. flags
&= ~dns
. flags
. RD
294 response
= dns
. message
. make_response ( query
)
295 response
. set_rcode ( dns
. rcode
. REFUSED
)
297 for method
in ( "sendUDPQuery" , "sendTCPQuery" ):
298 sender
= getattr ( self
, method
)
299 ( _
, receivedResponse
) = sender ( query
, response
= None , useQueue
= False )
300 self
. assertEqual ( receivedResponse
, response
)