class TestSpoofingSpoof(DNSDistTest):
_config_template = """
- addDomainSpoof("spoof.spoofing.tests.powerdns.com.", "192.0.2.1", "2001:DB8::1")
- addDomainCNAMESpoof("cnamespoof.spoofing.tests.powerdns.com.", "cname.spoofing.tests.powerdns.com.")
addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1"))
+ addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {aa=true}))
+ addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ad=true}))
+ addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=true}))
+ addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=false}))
addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com."))
- addDomainSpoof("multispoof.spoofing.tests.powerdns.com", {"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"})
+ addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}))
newServer{address="127.0.0.1:%s"}
"""
- def testSpoofA(self):
+ def testSpoofActionA(self):
"""
- Spoofing: Spoof A
+ Spoofing: Spoof A via Action
- Send an A query to "spoof.spoofing.tests.powerdns.com.",
+ Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'spoof.spoofing.tests.powerdns.com.'
+ name = 'spoofaction.spoofing.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
'192.0.2.1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
- def testSpoofAAAA(self):
+ def testSpoofActionAAAA(self):
"""
- Spoofing: Spoof AAAA
+ Spoofing: Spoof AAAA via Action
- Send an AAAA query to "spoof.spoofing.tests.powerdns.com.",
+ Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'spoof.spoofing.tests.powerdns.com.'
+ name = 'spoofaction.spoofing.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
'2001:DB8::1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- def testSpoofCNAME(self):
+ def testSpoofActionCNAME(self):
"""
- Spoofing: Spoof CNAME
+ Spoofing: Spoof CNAME via Action
- Send an A query for "cnamespoof.spoofing.tests.powerdns.com.",
+ Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'cnamespoof.spoofing.tests.powerdns.com.'
+ name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
60,
dns.rdataclass.IN,
dns.rdatatype.CNAME,
- 'cname.spoofing.tests.powerdns.com.')
+ 'cnameaction.spoofing.tests.powerdns.com.')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
- def testSpoofActionA(self):
+ def testSpoofActionMultiA(self):
"""
- Spoofing: Spoof A via Action
+ Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
- Send an A query to "spoofaction.spoofing.tests.powerdns.com.",
+ Send an A query for "multispoof.spoofing.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'spoofaction.spoofing.tests.powerdns.com.'
+ name = 'multispoof.spoofing.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
60,
dns.rdataclass.IN,
dns.rdatatype.A,
- '192.0.2.1')
+ '192.0.2.2', '192.0.2.1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- def testSpoofActionAAAA(self):
+ def testSpoofActionMultiAAAA(self):
"""
- Spoofing: Spoof AAAA via Action
+ Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
- Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.",
+ Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'spoofaction.spoofing.tests.powerdns.com.'
+ name = 'multispoof.spoofing.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
60,
dns.rdataclass.IN,
dns.rdatatype.AAAA,
- '2001:DB8::1')
+ '2001:DB8::1', '2001:DB8::2')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
- def testSpoofActionCNAME(self):
+ def testSpoofActionMultiANY(self):
"""
- Spoofing: Spoof CNAME via Action
+ Spoofing: Spoof multiple addresses via AddDomainSpoof
- Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.",
+ Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
check that dnsdist sends a spoofed result.
"""
- name = 'cnamespoofaction.spoofing.tests.powerdns.com.'
- query = dns.message.make_query(name, 'A', 'IN')
+ name = 'multispoof.spoofing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'ANY', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
+
rrset = dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
- dns.rdatatype.CNAME,
- 'cnameaction.spoofing.tests.powerdns.com.')
+ dns.rdatatype.A,
+ '192.0.2.2', '192.0.2.1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.AAAA,
+ '2001:DB8::1', '2001:DB8::2')
+ expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
- def testSpoofActionMultiA(self):
+ def testSpoofActionSetAA(self):
"""
- Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof
-
- Send an A query for "multispoof.spoofing.tests.powerdns.com.",
- check that dnsdist sends a spoofed result.
+ Spoofing: Spoof via Action, setting AA=1
"""
- name = 'multispoof.spoofing.tests.powerdns.com.'
- query = dns.message.make_query(name, 'A', 'IN')
+ name = 'spoofaction-aa.spoofing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'AAAA', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
+ expectedResponse.flags |= dns.flags.AA
rrset = dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
- dns.rdatatype.A,
- '192.0.2.2', '192.0.2.1')
+ dns.rdatatype.AAAA,
+ '2001:DB8::1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
- def testSpoofActionMultiAAAA(self):
+ def testSpoofActionSetAD(self):
"""
- Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof
-
- Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.",
- check that dnsdist sends a spoofed result.
+ Spoofing: Spoof via Action, setting AD=1
"""
- name = 'multispoof.spoofing.tests.powerdns.com.'
+ name = 'spoofaction-ad.spoofing.tests.powerdns.com.'
query = dns.message.make_query(name, 'AAAA', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
+ expectedResponse.flags |= dns.flags.AD
rrset = dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
dns.rdatatype.AAAA,
- '2001:DB8::1', '2001:DB8::2')
+ '2001:DB8::1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
- def testSpoofActionMultiANY(self):
+ def testSpoofActionSetRA(self):
"""
- Spoofing: Spoof multiple addresses via AddDomainSpoof
-
- Send an ANY query for "multispoof.spoofing.tests.powerdns.com.",
- check that dnsdist sends a spoofed result.
+ Spoofing: Spoof via Action, setting RA=1
"""
- name = 'multispoof.spoofing.tests.powerdns.com.'
- query = dns.message.make_query(name, 'ANY', 'IN')
+ name = 'spoofaction-ra.spoofing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'AAAA', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
-
+ expectedResponse.flags |= dns.flags.RA
rrset = dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
- dns.rdatatype.A,
- '192.0.2.2', '192.0.2.1')
+ dns.rdatatype.AAAA,
+ '2001:DB8::1')
expectedResponse.answer.append(rrset)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ def testSpoofActionSetNoRA(self):
+ """
+ Spoofing: Spoof via Action, setting RA=0
+ """
+ name = 'spoofaction-nora.spoofing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'AAAA', 'IN')
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.flags &= ~dns.flags.RA
rrset = dns.rrset.from_text(name,
60,
dns.rdataclass.IN,
dns.rdatatype.AAAA,
- '2001:DB8::1', '2001:DB8::2')
+ '2001:DB8::1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
class TestSpoofingLuaSpoof(DNSDistTest):
'192.0.2.1', '192.0.2.2')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testLuaSpoofAAAA(self):
"""
'2001:DB8::1')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testLuaSpoofAWithCNAME(self):
"""
'spoofedcname.spoofing.tests.powerdns.com.')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
def testLuaSpoofAAAAWithCNAME(self):
"""
'spoofedcname.spoofing.tests.powerdns.com.')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponse, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
class TestSpoofingLuaWithStatistics(DNSDistTest):
self.assertTrue(receivedResponse)
self.assertEquals(expectedResponse2, receivedResponse)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponseAfterwards, receivedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertTrue(receivedResponse)
- self.assertEquals(expectedResponseAfterwards, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponseAfterwards, receivedResponse)