]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_LuaFFI.py
auth: add options to DomainInfo
[thirdparty/pdns.git] / regression-tests.dnsdist / test_LuaFFI.py
CommitLineData
7b9abc20
RG
1#!/usr/bin/env python
2
3import unittest
4import dns
5from dnsdisttests import DNSDistTest
6
7class TestAdvancedLuaFFI(DNSDistTest):
8
9 _config_template = """
10 local ffi = require("ffi")
11
12 local expectingUDP = true
13
14 function luaffirulefunction(dq)
15 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
16 if qtype ~= DNSQType.A and qtype ~= DNSQType.SOA then
17 print('invalid qtype')
18 return false
19 end
20
21 local qclass = ffi.C.dnsdist_ffi_dnsquestion_get_qclass(dq)
22 if qclass ~= DNSClass.IN then
23 print('invalid qclass')
24 return false
25 end
26
27 local ret_ptr = ffi.new("char *[1]")
28 local ret_ptr_param = ffi.cast("const char **", ret_ptr)
29 local ret_size = ffi.new("size_t[1]")
30 local ret_size_param = ffi.cast("size_t*", ret_size)
31 ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, ret_ptr_param, ret_size_param)
32 if ret_size[0] ~= 36 then
33 print('invalid length for the qname ')
34 print(ret_size[0])
35 return false
36 end
37
38 local expectedQname = string.char(6)..'luaffi'..string.char(8)..'advanced'..string.char(5)..'tests'..string.char(8)..'powerdns'..string.char(3)..'com'
39 if ffi.string(ret_ptr[0]) ~= expectedQname then
40 print('invalid qname')
41 print(ffi.string(ret_ptr[0]))
42 return false
43 end
44
45 local rcode = ffi.C.dnsdist_ffi_dnsquestion_get_rcode(dq)
46 if rcode ~= 0 then
47 print('invalid rcode')
48 return false
49 end
50
51 local opcode = ffi.C.dnsdist_ffi_dnsquestion_get_opcode(dq)
52 if qtype == DNSQType.A and opcode ~= DNSOpcode.Query then
53 print('invalid opcode')
54 return false
55 elseif qtype == DNSQType.SOA and opcode ~= DNSOpcode.Update then
56 print('invalid opcode')
57 return false
58 end
59
60 local tcp = ffi.C.dnsdist_ffi_dnsquestion_get_tcp(dq)
61 if expectingUDP == tcp then
62 print('invalid tcp')
63 return false
64 end
65 expectingUDP = expectingUDP == false
66
67 local dnssecok = ffi.C.dnsdist_ffi_dnsquestion_get_do(dq)
68 if dnssecok ~= false then
69 print('invalid DNSSEC OK')
70 return false
71 end
72
73 local len = ffi.C.dnsdist_ffi_dnsquestion_get_len(dq)
74 if len ~= 52 then
75 print('invalid length')
76 print(len)
77 return false
78 end
79
80 local tag = ffi.C.dnsdist_ffi_dnsquestion_get_tag(dq, 'a-tag')
81 if ffi.string(tag) ~= 'a-value' then
82 print('invalid tag value')
83 print(ffi.string(tag))
84 return false
85 end
86 return true
87 end
88
89 function luaffiactionfunction(dq)
90 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
91 if qtype == DNSQType.A then
92 local str = "192.0.2.1"
93 local buf = ffi.new("char[?]", #str + 1)
94 ffi.copy(buf, str)
95 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
96 return DNSAction.Spoof
97 elseif qtype == DNSQType.SOA then
98 ffi.C.dnsdist_ffi_dnsquestion_set_rcode(dq, DNSRCode.REFUSED)
99 return DNSAction.Refused
100 end
101 end
102
103 function luaffiactionsettag(dq)
104 ffi.C.dnsdist_ffi_dnsquestion_set_tag(dq, 'a-tag', 'a-value')
105 return DNSAction.None
106 end
107
108 addAction(AllRule(), LuaFFIAction(luaffiactionsettag))
109 addAction(LuaFFIRule(luaffirulefunction), LuaFFIAction(luaffiactionfunction))
110 -- newServer{address="127.0.0.1:%s"}
111 """
112
113 def testAdvancedLuaFFI(self):
114 """
115 Lua FFI: Test the Lua FFI interface
116 """
117 name = 'luaffi.advanced.tests.powerdns.com.'
118 query = dns.message.make_query(name, 'A', 'IN')
119 # dnsdist set RA = RD for spoofed responses
120 query.flags &= ~dns.flags.RD
121
122 response = dns.message.make_response(query)
123 rrset = dns.rrset.from_text(name,
124 60,
125 dns.rdataclass.IN,
126 dns.rdatatype.A,
127 '192.0.2.1')
128 response.answer.append(rrset)
129
130 for method in ("sendUDPQuery", "sendTCPQuery"):
131 sender = getattr(self, method)
132 (_, receivedResponse) = sender(query, response=None, useQueue=False)
133 self.assertEqual(receivedResponse, response)
134
135 def testAdvancedLuaFFIUpdate(self):
136 """
137 Lua FFI: Test the Lua FFI interface via an update
138 """
139 name = 'luaffi.advanced.tests.powerdns.com.'
140 query = dns.message.make_query(name, 'SOA', 'IN')
141 query.set_opcode(dns.opcode.UPDATE)
142 # dnsdist set RA = RD for spoofed responses
143 query.flags &= ~dns.flags.RD
144
145 response = dns.message.make_response(query)
146 response.set_rcode(dns.rcode.REFUSED)
147
148 for method in ("sendUDPQuery", "sendTCPQuery"):
149 sender = getattr(self, method)
150 (_, receivedResponse) = sender(query, response=None, useQueue=False)
151 self.assertEqual(receivedResponse, response)
152
153class TestAdvancedLuaFFIPerThread(DNSDistTest):
154
155 _config_template = """
156
157 local rulefunction = [[
158 local ffi = require("ffi")
159
160 return function(dq)
161 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
162 if qtype ~= DNSQType.A and qtype ~= DNSQType.SOA then
163 print('invalid qtype')
164 return false
165 end
166
167 local qclass = ffi.C.dnsdist_ffi_dnsquestion_get_qclass(dq)
168 if qclass ~= DNSClass.IN then
169 print('invalid qclass')
170 return false
171 end
172
173 local ret_ptr = ffi.new("char *[1]")
174 local ret_ptr_param = ffi.cast("const char **", ret_ptr)
175 local ret_size = ffi.new("size_t[1]")
176 local ret_size_param = ffi.cast("size_t*", ret_size)
177 ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, ret_ptr_param, ret_size_param)
178 if ret_size[0] ~= 45 then
179 print('invalid length for the qname ')
180 print(ret_size[0])
181 return false
182 end
183
184 local expectedQname = string.char(15)..'luaffiperthread'..string.char(8)..'advanced'..string.char(5)..'tests'..string.char(8)..'powerdns'..string.char(3)..'com'
185 if ffi.string(ret_ptr[0]) ~= expectedQname then
186 print('invalid qname')
187 print(ffi.string(ret_ptr[0]))
188 return false
189 end
190
191 local rcode = ffi.C.dnsdist_ffi_dnsquestion_get_rcode(dq)
192 if rcode ~= 0 then
193 print('invalid rcode')
194 return false
195 end
196
197 local opcode = ffi.C.dnsdist_ffi_dnsquestion_get_opcode(dq)
198 if qtype == DNSQType.A and opcode ~= DNSOpcode.Query then
199 print('invalid opcode')
200 return false
201 elseif qtype == DNSQType.SOA and opcode ~= DNSOpcode.Update then
202 print('invalid opcode')
203 return false
204 end
205
206 local dnssecok = ffi.C.dnsdist_ffi_dnsquestion_get_do(dq)
207 if dnssecok ~= false then
208 print('invalid DNSSEC OK')
209 return false
210 end
211
212 local len = ffi.C.dnsdist_ffi_dnsquestion_get_len(dq)
213 if len ~= 61 then
214 print('invalid length')
215 print(len)
216 return false
217 end
218
219 local tag = ffi.C.dnsdist_ffi_dnsquestion_get_tag(dq, 'a-tag')
220 if ffi.string(tag) ~= 'a-value' then
221 print('invalid tag value')
222 print(ffi.string(tag))
223 return false
224 end
225
226 return true
227 end
228 ]]
229
230 local actionfunction = [[
231 local ffi = require("ffi")
232
233 return function(dq)
234 local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
235 if qtype == DNSQType.A then
236 local str = "192.0.2.1"
237 local buf = ffi.new("char[?]", #str + 1)
238 ffi.copy(buf, str)
239 ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
240 return DNSAction.Spoof
241 elseif qtype == DNSQType.SOA then
242 ffi.C.dnsdist_ffi_dnsquestion_set_rcode(dq, DNSRCode.REFUSED)
243 return DNSAction.Refused
244 end
245 end
246 ]]
247
248 local settagfunction = [[
249 local ffi = require("ffi")
250
251 return function(dq)
252 ffi.C.dnsdist_ffi_dnsquestion_set_tag(dq, 'a-tag', 'a-value')
253 return DNSAction.None
254 end
255 ]]
256
257 addAction(AllRule(), LuaFFIPerThreadAction(settagfunction))
258 addAction(LuaFFIPerThreadRule(rulefunction), LuaFFIPerThreadAction(actionfunction))
259 -- newServer{address="127.0.0.1:%s"}
260 """
261
262 def testAdvancedLuaPerthreadFFI(self):
263 """
264 Lua FFI: Test the Lua FFI per-thread interface
265 """
266 name = 'luaffiperthread.advanced.tests.powerdns.com.'
267 query = dns.message.make_query(name, 'A', 'IN')
268 # dnsdist set RA = RD for spoofed responses
269 query.flags &= ~dns.flags.RD
270
271 response = dns.message.make_response(query)
272 rrset = dns.rrset.from_text(name,
273 60,
274 dns.rdataclass.IN,
275 dns.rdatatype.A,
276 '192.0.2.1')
277 response.answer.append(rrset)
278
279 for method in ("sendUDPQuery", "sendTCPQuery"):
280 sender = getattr(self, method)
281 (_, receivedResponse) = sender(query, response=None, useQueue=False)
282 self.assertEqual(receivedResponse, response)
283
284 def testAdvancedLuaFFIPerThreadUpdate(self):
285 """
286 Lua FFI: Test the Lua FFI per-thread interface via an update
287 """
288 name = 'luaffiperthread.advanced.tests.powerdns.com.'
289 query = dns.message.make_query(name, 'SOA', 'IN')
290 query.set_opcode(dns.opcode.UPDATE)
291 # dnsdist set RA = RD for spoofed responses
292 query.flags &= ~dns.flags.RD
293
294 response = dns.message.make_response(query)
295 response.set_rcode(dns.rcode.REFUSED)
296
297 for method in ("sendUDPQuery", "sendTCPQuery"):
298 sender = getattr(self, method)
299 (_, receivedResponse) = sender(query, response=None, useQueue=False)
300 self.assertEqual(receivedResponse, response)