class TestSpoofingSpoof(DNSDistTest):
_config_template = """
- addDomainSpoof("spoof.spoofing.tests.powerdns.com.", "192.0.2.1", "2001:DB8::1")
- addDomainCNAMESpoof("cnamespoof.spoofing.tests.powerdns.com.", "cname.spoofing.tests.powerdns.com.")
addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
- addDomainSpoof("multispoof.spoofing.tests.powerdns.com", {"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"})
+ addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
newServer{address="127.0.0.1:%s"}
"""
- def testSpoofA(self):
- """
- Spoofing: Spoof A
-
- Send an A query to "spoof.spoofing.tests.powerdns.com.",
- check that dnsdist sends a spoofed result.
- """
- name = 'spoof.spoofing.tests.powerdns.com.'
- query = dns.message.make_query(name, 'A', 'IN')
- # dnsdist set RA = RD for spoofed responses
- query.flags &= ~dns.flags.RD
- expectedResponse = dns.message.make_response(query)
- rrset = dns.rrset.from_text(name,
- 60,
- dns.rdataclass.IN,
- dns.rdatatype.A,
- '192.0.2.1')
- expectedResponse.answer.append(rrset)
-
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- def testSpoofAAAA(self):
- """
- Spoofing: Spoof AAAA
-
- Send an AAAA query to "spoof.spoofing.tests.powerdns.com.",
- check that dnsdist sends a spoofed result.
- """
- name = 'spoof.spoofing.tests.powerdns.com.'
- query = dns.message.make_query(name, 'AAAA', 'IN')
- # dnsdist set RA = RD for spoofed responses
- query.flags &= ~dns.flags.RD
- expectedResponse = dns.message.make_response(query)
- rrset = dns.rrset.from_text(name,
- 60,
- dns.rdataclass.IN,
- dns.rdatatype.AAAA,
- '2001:DB8::1')
- expectedResponse.answer.append(rrset)
-
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- def testSpoofCNAME(self):
- """
- Spoofing: Spoof CNAME
-
- Send an A query for "cnamespoof.spoofing.tests.powerdns.com.",
- check that dnsdist sends a spoofed result.
- """
- name = 'cnamespoof.spoofing.tests.powerdns.com.'
- query = dns.message.make_query(name, 'A', 'IN')
- # dnsdist set RA = RD for spoofed responses
- query.flags &= ~dns.flags.RD
- expectedResponse = dns.message.make_response(query)
- rrset = dns.rrset.from_text(name,
- 60,
- dns.rdataclass.IN,
- dns.rdatatype.CNAME,
- 'cname.spoofing.tests.powerdns.com.')
- expectedResponse.answer.append(rrset)
-
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
def testSpoofActionA(self):
"""
Spoofing: Spoof A via Action
'192.0.2.1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testSpoofActionAAAA(self):
"""
'2001:DB8::1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testSpoofActionCNAME(self):
"""
'cnameaction.spoofing.tests.powerdns.com.')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testSpoofActionMultiA(self):
"""
'192.0.2.2', '192.0.2.1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testSpoofActionMultiAAAA(self):
"""
'2001:DB8::1', '2001:DB8::2')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testSpoofActionMultiANY(self):
"""
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
-
+
rrset = dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
dns.rdatatype.A,
'192.0.2.2', '192.0.2.1')
expectedResponse.answer.append(rrset)
-
+
rrset = dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
'2001:DB8::1', '2001:DB8::2')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
class TestSpoofingLuaSpoof(DNSDistTest):
function spoof1rule(dq)
if(dq.qtype==1) -- A
then
- return DNSAction.Spoof, "192.0.2.1"
+ return DNSAction.Spoof, "192.0.2.1,192.0.2.2"
elseif(dq.qtype == 28) -- AAAA
then
return DNSAction.Spoof, "2001:DB8::1"
function spoof2rule(dq)
return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com."
end
- addLuaAction("luaspoof1.spoofing.tests.powerdns.com.", spoof1rule)
- addLuaAction("luaspoof2.spoofing.tests.powerdns.com.", spoof2rule)
+ addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
+ addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule))
newServer{address="127.0.0.1:%s"}
"""
60,
dns.rdataclass.IN,
dns.rdatatype.A,
- '192.0.2.1')
+ '192.0.2.1', '192.0.2.2')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testLuaSpoofAAAA(self):
"""
'2001:DB8::1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testLuaSpoofAWithCNAME(self):
"""
'spoofedcname.spoofing.tests.powerdns.com.')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testLuaSpoofAAAAWithCNAME(self):
"""
'spoofedcname.spoofing.tests.powerdns.com.')
expectedResponse.answer.append(rrset)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+class TestSpoofingLuaWithStatistics(DNSDistTest):
+
+ _config_template = """
+ function spoof1rule(dq)
+ queriesCount = getStatisticsCounters()['queries']
+ if(queriesCount == 1) then
+ return DNSAction.Spoof, "192.0.2.1"
+ elseif(queriesCount == 2) then
+ return DNSAction.Spoof, "192.0.2.2"
+ else
+ return DNSAction.Spoof, "192.0.2.0"
+ end
+ end
+ addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule))
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testLuaSpoofBasedOnStatistics(self):
+ """
+ Spoofing: Spoofing an A via Lua based on statistics counters
+
+ """
+ name = 'luaspoofwithstats.spoofing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ expectedResponse1 = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ expectedResponse1.answer.append(rrset)
+ expectedResponse2 = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.2')
+ expectedResponse2.answer.append(rrset)
+ expectedResponseAfterwards = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.0')
+ expectedResponseAfterwards.answer.append(rrset)
+
(_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ self.assertEquals(expectedResponse1, receivedResponse)
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ self.assertEquals(expectedResponse2, receivedResponse)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponseAfterwards, receivedResponse)