print(receivedResponse)
print(expectedResponse)
self.assertEquals(receivedResponse, expectedResponse)
+
+class TestAdvancedSetNegativeAndSOA(DNSDistTest):
+
+ _config_template = """
+ addAction("nxd.setnegativeandsoa.advanced.tests.powerdns.com.", SetNegativeAndSOAAction(true, "auth.", 42, "mname", "rname", 5, 4, 3, 2, 1))
+ addAction("nodata.setnegativeandsoa.advanced.tests.powerdns.com.", SetNegativeAndSOAAction(false, "another-auth.", 42, "mname", "rname", 1, 2, 3, 4, 5))
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedNegativeAndSOANXD(self):
+ """
+ Advanced: SetNegativeAndSOAAction NXD
+ """
+ name = 'nxd.setnegativeandsoa.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
+ soa = dns.rrset.from_text("auth",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 5 4 3 2 1')
+ expectedResponse.additional.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ def testAdvancedNegativeAndSOANoData(self):
+ """
+ Advanced: SetNegativeAndSOAAction NoData
+ """
+ name = 'nodata.setnegativeandsoa.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.NOERROR)
+ soa = dns.rrset.from_text("another-auth",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 1 2 3 4 5')
+ expectedResponse.additional.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
+
+class TestAdvancedLuaRule(DNSDistTest):
+
+ _config_template = """
+
+ function luarulefunction(dq)
+ if dq.qname:toString() == 'lua-rule.advanced.tests.powerdns.com.' then
+ return true
+ end
+ return false
+ end
+
+ addAction(LuaRule(luarulefunction), RCodeAction(DNSRCode.NOTIMP))
+ addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
+ -- newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedLuaRule(self):
+ """
+ Advanced: Test the LuaRule rule
+ """
+ name = 'lua-rule.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ notimplResponse = dns.message.make_response(query)
+ notimplResponse.set_rcode(dns.rcode.NOTIMP)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, notimplResponse)
+
+ name = 'not-lua-rule.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ refusedResponse = dns.message.make_response(query)
+ refusedResponse.set_rcode(dns.rcode.REFUSED)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, refusedResponse)
+
+class TestAdvancedLuaFFI(DNSDistTest):
+
+ _config_template = """
+ local ffi = require("ffi")
+
+ local expectingUDP = true
+
+ function luaffirulefunction(dq)
+ local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
+ if qtype ~= DNSQType.A and qtype ~= DNSQType.SOA then
+ print('invalid qtype')
+ return false
+ end
+
+ local qclass = ffi.C.dnsdist_ffi_dnsquestion_get_qclass(dq)
+ if qclass ~= DNSClass.IN then
+ print('invalid qclass')
+ return false
+ end
+
+ local ret_ptr = ffi.new("char *[1]")
+ local ret_ptr_param = ffi.cast("const char **", ret_ptr)
+ local ret_size = ffi.new("size_t[1]")
+ local ret_size_param = ffi.cast("size_t*", ret_size)
+ ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, ret_ptr_param, ret_size_param)
+ if ret_size[0] ~= 36 then
+ print('invalid length for the qname ')
+ print(ret_size[0])
+ return false
+ end
+
+ local expectedQname = string.char(6)..'luaffi'..string.char(8)..'advanced'..string.char(5)..'tests'..string.char(8)..'powerdns'..string.char(3)..'com'
+ if ffi.string(ret_ptr[0]) ~= expectedQname then
+ print('invalid qname')
+ print(ffi.string(ret_ptr[0]))
+ return false
+ end
+
+ local rcode = ffi.C.dnsdist_ffi_dnsquestion_get_rcode(dq)
+ if rcode ~= 0 then
+ print('invalid rcode')
+ return false
+ end
+
+ local opcode = ffi.C.dnsdist_ffi_dnsquestion_get_opcode(dq)
+ if qtype == DNSQType.A and opcode ~= DNSOpcode.Query then
+ print('invalid opcode')
+ return false
+ elseif qtype == DNSQType.SOA and opcode ~= DNSOpcode.Update then
+ print('invalid opcode')
+ return false
+ end
+
+ local tcp = ffi.C.dnsdist_ffi_dnsquestion_get_tcp(dq)
+ if expectingUDP == tcp then
+ print('invalid tcp')
+ return false
+ end
+ expectingUDP = expectingUDP == false
+
+ local dnssecok = ffi.C.dnsdist_ffi_dnsquestion_get_do(dq)
+ if dnssecok ~= false then
+ print('invalid DNSSEC OK')
+ return false
+ end
+
+ local len = ffi.C.dnsdist_ffi_dnsquestion_get_len(dq)
+ if len ~= 52 then
+ print('invalid length')
+ print(len)
+ return false
+ end
+
+ local tag = ffi.C.dnsdist_ffi_dnsquestion_get_tag(dq, 'a-tag')
+ if ffi.string(tag) ~= 'a-value' then
+ print('invalid tag value')
+ print(ffi.string(tag))
+ return false
+ end
+ return true
+ end
+
+ function luaffiactionfunction(dq)
+ local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
+ if qtype == DNSQType.A then
+ local str = "192.0.2.1"
+ local buf = ffi.new("char[?]", #str + 1)
+ ffi.copy(buf, str)
+ ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
+ return DNSAction.Spoof
+ elseif qtype == DNSQType.SOA then
+ ffi.C.dnsdist_ffi_dnsquestion_set_rcode(dq, DNSRCode.REFUSED)
+ return DNSAction.Refused
+ end
+ end
+
+ function luaffiactionsettag(dq)
+ ffi.C.dnsdist_ffi_dnsquestion_set_tag(dq, 'a-tag', 'a-value')
+ return DNSAction.None
+ end
+
+ addAction(AllRule(), LuaFFIAction(luaffiactionsettag))
+ addAction(LuaFFIRule(luaffirulefunction), LuaFFIAction(luaffiactionfunction))
+ -- newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedLuaFFI(self):
+ """
+ Advanced: Test the Lua FFI interface
+ """
+ name = 'luaffi.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, response)
+
+ def testAdvancedLuaFFIUpdate(self):
+ """
+ Advanced: Test the Lua FFI interface via an update
+ """
+ name = 'luaffi.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'SOA', 'IN')
+ query.set_opcode(dns.opcode.UPDATE)
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.REFUSED)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, response)