"""
name = 'anytruncatetcp.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'ANY', 'IN')
+ # dnsdist sets RA = RD for TC responses
+ query.flags &= ~dns.flags.RD
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
"""
name = 'andnot.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'TXT', 'IN')
+ # dnsdist sets RA = RD for TC responses
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.NOTIMP)
"""
name = 'aorudp.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
+ query.flags &= ~dns.flags.RD
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
3600,
"""
name = 'aorudp.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.NOTIMP)
"""
name = 'qpsnone.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
"""
name = 'nmgrule.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
name = 'dstportrule.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
# more than 6 labels, the query should be refused
name = 'not.ok.labelscount.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
# less than 5 labels, the query should be refused
name = 'labelscountadvanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
# too short, the query should be refused
name = 'short.qnamewirelength.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
# too long, the query should be refused
name = 'toolongtobevalid.qnamewirelength.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
# this one should be refused
name = 'notincludedir.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
"""
name = 'tc.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist sets RA = RD for TC responses
+ query.flags &= ~dns.flags.RD
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
3600,
query = dns.message.make_query(name, 'A', 'IN')
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
+ expectedResponse.flags |= dns.flags.RA
for method in ("sendUDPQuery", "sendTCPQuery"):
sender = getattr(self, method)
addAction(EDNSVersionRule(0), ERCodeAction(DNSRCode.BADVERS))
"""
- def testDropped(self):
+ def testBadVers(self):
"""
- Advanced: A question with ECS version larger than 0 is dropped
+ Advanced: A question with ECS version larger than 0 yields BADVERS
"""
name = 'ednsversionrule.advanced.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN', use_edns=1)
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.BADVERS)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.assertTrue(receivedResponse)
self.assertEquals(expectedResponse, receivedResponse)
+
+class TestAdvancedContinueAction(DNSDistTest):
+
+ _config_template = """
+ newServer{address="127.0.0.1:%s", pool="mypool"}
+ addAction("nocontinue.continue-action.advanced.tests.powerdns.com.", PoolAction("mypool"))
+ addAction("continue.continue-action.advanced.tests.powerdns.com.", ContinueAction(PoolAction("mypool")))
+ addAction(AllRule(), DisableValidationAction())
+ """
+
+ def testNoContinue(self):
+ """
+ Advanced: Query routed to pool, PoolAction should be terminal
+ """
+
+ name = 'nocontinue.continue-action.advanced.tests.powerdns.com.'
+
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedQuery = dns.message.make_query(name, 'A', 'IN')
+
+ response = dns.message.make_response(query)
+ expectedResponse = dns.message.make_response(query)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertEquals(receivedQuery, expectedQuery)
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ def testNoContinue(self):
+ """
+ Advanced: Query routed to pool, ContinueAction() should not stop the processing
+ """
+
+ name = 'continue.continue-action.advanced.tests.powerdns.com.'
+
+ query = dns.message.make_query(name, 'A', 'IN')
+ expectedQuery = dns.message.make_query(name, 'A', 'IN')
+ expectedQuery.flags |= dns.flags.CD
+
+ response = dns.message.make_response(query)
+ expectedResponse = dns.message.make_response(query)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ expectedQuery.id = receivedQuery.id
+ self.assertEquals(receivedQuery, expectedQuery)
+ print(receivedResponse)
+ print(expectedResponse)
+ self.assertEquals(receivedResponse, expectedResponse)
+
+class TestAdvancedSetNegativeAndSOA(DNSDistTest):
+
+ _config_template = """
+ addAction("nxd.setnegativeandsoa.advanced.tests.powerdns.com.", SetNegativeAndSOAAction(true, "auth.", 42, "mname", "rname", 5, 4, 3, 2, 1))
+ addAction("nodata.setnegativeandsoa.advanced.tests.powerdns.com.", SetNegativeAndSOAAction(false, "another-auth.", 42, "mname", "rname", 1, 2, 3, 4, 5))
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedNegativeAndSOANXD(self):
+ """
+ Advanced: SetNegativeAndSOAAction NXD
+ """
+ name = 'nxd.setnegativeandsoa.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.NXDOMAIN)
+ soa = dns.rrset.from_text("auth",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 5 4 3 2 1')
+ expectedResponse.additional.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ def testAdvancedNegativeAndSOANoData(self):
+ """
+ Advanced: SetNegativeAndSOAAction NoData
+ """
+ name = 'nodata.setnegativeandsoa.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.NOERROR)
+ soa = dns.rrset.from_text("another-auth",
+ 42,
+ dns.rdataclass.IN,
+ dns.rdatatype.SOA,
+ 'mname. rname. 1 2 3 4 5')
+ expectedResponse.additional.append(soa)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
+
+class TestAdvancedLuaRule(DNSDistTest):
+
+ _config_template = """
+
+ function luarulefunction(dq)
+ if dq.qname:toString() == 'lua-rule.advanced.tests.powerdns.com.' then
+ return true
+ end
+ return false
+ end
+
+ addAction(LuaRule(luarulefunction), RCodeAction(DNSRCode.NOTIMP))
+ addAction(AllRule(), RCodeAction(DNSRCode.REFUSED))
+ -- newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedLuaRule(self):
+ """
+ Advanced: Test the LuaRule rule
+ """
+ name = 'lua-rule.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ notimplResponse = dns.message.make_response(query)
+ notimplResponse.set_rcode(dns.rcode.NOTIMP)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, notimplResponse)
+
+ name = 'not-lua-rule.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ refusedResponse = dns.message.make_response(query)
+ refusedResponse.set_rcode(dns.rcode.REFUSED)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, refusedResponse)
+
+class TestAdvancedLuaFFI(DNSDistTest):
+
+ _config_template = """
+ local ffi = require("ffi")
+
+ local expectingUDP = true
+
+ function luaffirulefunction(dq)
+ local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
+ if qtype ~= DNSQType.A and qtype ~= DNSQType.SOA then
+ print('invalid qtype')
+ return false
+ end
+
+ local qclass = ffi.C.dnsdist_ffi_dnsquestion_get_qclass(dq)
+ if qclass ~= DNSClass.IN then
+ print('invalid qclass')
+ return false
+ end
+
+ local ret_ptr = ffi.new("char *[1]")
+ local ret_ptr_param = ffi.cast("const char **", ret_ptr)
+ local ret_size = ffi.new("size_t[1]")
+ local ret_size_param = ffi.cast("size_t*", ret_size)
+ ffi.C.dnsdist_ffi_dnsquestion_get_qname_raw(dq, ret_ptr_param, ret_size_param)
+ if ret_size[0] ~= 36 then
+ print('invalid length for the qname ')
+ print(ret_size[0])
+ return false
+ end
+
+ local expectedQname = string.char(6)..'luaffi'..string.char(8)..'advanced'..string.char(5)..'tests'..string.char(8)..'powerdns'..string.char(3)..'com'
+ if ffi.string(ret_ptr[0]) ~= expectedQname then
+ print('invalid qname')
+ print(ffi.string(ret_ptr[0]))
+ return false
+ end
+
+ local rcode = ffi.C.dnsdist_ffi_dnsquestion_get_rcode(dq)
+ if rcode ~= 0 then
+ print('invalid rcode')
+ return false
+ end
+
+ local opcode = ffi.C.dnsdist_ffi_dnsquestion_get_opcode(dq)
+ if qtype == DNSQType.A and opcode ~= DNSOpcode.Query then
+ print('invalid opcode')
+ return false
+ elseif qtype == DNSQType.SOA and opcode ~= DNSOpcode.Update then
+ print('invalid opcode')
+ return false
+ end
+
+ local tcp = ffi.C.dnsdist_ffi_dnsquestion_get_tcp(dq)
+ if expectingUDP == tcp then
+ print('invalid tcp')
+ return false
+ end
+ expectingUDP = expectingUDP == false
+
+ local dnssecok = ffi.C.dnsdist_ffi_dnsquestion_get_do(dq)
+ if dnssecok ~= false then
+ print('invalid DNSSEC OK')
+ return false
+ end
+
+ local len = ffi.C.dnsdist_ffi_dnsquestion_get_len(dq)
+ if len ~= 52 then
+ print('invalid length')
+ print(len)
+ return false
+ end
+
+ local tag = ffi.C.dnsdist_ffi_dnsquestion_get_tag(dq, 'a-tag')
+ if ffi.string(tag) ~= 'a-value' then
+ print('invalid tag value')
+ print(ffi.string(tag))
+ return false
+ end
+ return true
+ end
+
+ function luaffiactionfunction(dq)
+ local qtype = ffi.C.dnsdist_ffi_dnsquestion_get_qtype(dq)
+ if qtype == DNSQType.A then
+ local str = "192.0.2.1"
+ local buf = ffi.new("char[?]", #str + 1)
+ ffi.copy(buf, str)
+ ffi.C.dnsdist_ffi_dnsquestion_set_result(dq, buf, #str)
+ return DNSAction.Spoof
+ elseif qtype == DNSQType.SOA then
+ ffi.C.dnsdist_ffi_dnsquestion_set_rcode(dq, DNSRCode.REFUSED)
+ return DNSAction.Refused
+ end
+ end
+
+ function luaffiactionsettag(dq)
+ ffi.C.dnsdist_ffi_dnsquestion_set_tag(dq, 'a-tag', 'a-value')
+ return DNSAction.None
+ end
+
+ addAction(AllRule(), LuaFFIAction(luaffiactionsettag))
+ addAction(LuaFFIRule(luaffirulefunction), LuaFFIAction(luaffiactionfunction))
+ -- newServer{address="127.0.0.1:%s"}
+ """
+
+ def testAdvancedLuaFFI(self):
+ """
+ Advanced: Test the Lua FFI interface
+ """
+ name = 'luaffi.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, response)
+
+ def testAdvancedLuaFFIUpdate(self):
+ """
+ Advanced: Test the Lua FFI interface via an update
+ """
+ name = 'luaffi.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'SOA', 'IN')
+ query.set_opcode(dns.opcode.UPDATE)
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.REFUSED)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, response)