X-Git-Url: http://git.ipfire.org/?a=blobdiff_plain;f=regression-tests.dnsdist%2Ftest_Spoofing.py;h=2ae03bf8668a178a0b846bdd8c9d5ab3b6102567;hb=5cdbe0af873d0ffaa82452312611c70ed94bb6ea;hp=7c271a069236dcd020f4e6caef98f4f65042366b;hpb=4e265d762f9961d8017cde8137e66fd5205061e6;p=thirdparty%2Fpdns.git diff --git a/regression-tests.dnsdist/test_Spoofing.py b/regression-tests.dnsdist/test_Spoofing.py index 7c271a0692..2ae03bf866 100644 --- a/regression-tests.dnsdist/test_Spoofing.py +++ b/regression-tests.dnsdist/test_Spoofing.py @@ -5,22 +5,24 @@ from dnsdisttests import DNSDistTest class TestSpoofingSpoof(DNSDistTest): _config_template = """ - addDomainSpoof("spoof.spoofing.tests.powerdns.com.", "192.0.2.1", "2001:DB8::1") - addDomainCNAMESpoof("cnamespoof.spoofing.tests.powerdns.com.", "cname.spoofing.tests.powerdns.com.") addAction(makeRule("spoofaction.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1")) + addAction(makeRule("spoofaction-aa.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {aa=true})) + addAction(makeRule("spoofaction-ad.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ad=true})) + addAction(makeRule("spoofaction-ra.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=true})) + addAction(makeRule("spoofaction-nora.spoofing.tests.powerdns.com."), SpoofAction("192.0.2.1", "2001:DB8::1", {ra=false})) addAction(makeRule("cnamespoofaction.spoofing.tests.powerdns.com."), SpoofCNAMEAction("cnameaction.spoofing.tests.powerdns.com.")) - addDomainSpoof("multispoof.spoofing.tests.powerdns.com", {"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"}) + addAction("multispoof.spoofing.tests.powerdns.com", SpoofAction({"192.0.2.1", "192.0.2.2", "2001:DB8::1", "2001:DB8::2"})) newServer{address="127.0.0.1:%s"} """ - def testSpoofA(self): + def testSpoofActionA(self): """ - Spoofing: Spoof A + Spoofing: Spoof A via Action - Send an A query to "spoof.spoofing.tests.powerdns.com.", + Send an A query to "spoofaction.spoofing.tests.powerdns.com.", check that dnsdist sends a spoofed result. """ - name = 'spoof.spoofing.tests.powerdns.com.' + name = 'spoofaction.spoofing.tests.powerdns.com.' query = dns.message.make_query(name, 'A', 'IN') # dnsdist set RA = RD for spoofed responses query.flags &= ~dns.flags.RD @@ -32,22 +34,20 @@ class TestSpoofingSpoof(DNSDistTest): '192.0.2.1') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) - def testSpoofAAAA(self): + def testSpoofActionAAAA(self): """ - Spoofing: Spoof AAAA + Spoofing: Spoof AAAA via Action - Send an AAAA query to "spoof.spoofing.tests.powerdns.com.", + Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.", check that dnsdist sends a spoofed result. """ - name = 'spoof.spoofing.tests.powerdns.com.' + name = 'spoofaction.spoofing.tests.powerdns.com.' query = dns.message.make_query(name, 'AAAA', 'IN') # dnsdist set RA = RD for spoofed responses query.flags &= ~dns.flags.RD @@ -59,22 +59,20 @@ class TestSpoofingSpoof(DNSDistTest): '2001:DB8::1') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - def testSpoofCNAME(self): + def testSpoofActionCNAME(self): """ - Spoofing: Spoof CNAME + Spoofing: Spoof CNAME via Action - Send an A query for "cnamespoof.spoofing.tests.powerdns.com.", + Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.", check that dnsdist sends a spoofed result. """ - name = 'cnamespoof.spoofing.tests.powerdns.com.' + name = 'cnamespoofaction.spoofing.tests.powerdns.com.' query = dns.message.make_query(name, 'A', 'IN') # dnsdist set RA = RD for spoofed responses query.flags &= ~dns.flags.RD @@ -83,25 +81,23 @@ class TestSpoofingSpoof(DNSDistTest): 60, dns.rdataclass.IN, dns.rdatatype.CNAME, - 'cname.spoofing.tests.powerdns.com.') + 'cnameaction.spoofing.tests.powerdns.com.') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) - def testSpoofActionA(self): + def testSpoofActionMultiA(self): """ - Spoofing: Spoof A via Action + Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof - Send an A query to "spoofaction.spoofing.tests.powerdns.com.", + Send an A query for "multispoof.spoofing.tests.powerdns.com.", check that dnsdist sends a spoofed result. """ - name = 'spoofaction.spoofing.tests.powerdns.com.' + name = 'multispoof.spoofing.tests.powerdns.com.' query = dns.message.make_query(name, 'A', 'IN') # dnsdist set RA = RD for spoofed responses query.flags &= ~dns.flags.RD @@ -110,25 +106,23 @@ class TestSpoofingSpoof(DNSDistTest): 60, dns.rdataclass.IN, dns.rdatatype.A, - '192.0.2.1') + '192.0.2.2', '192.0.2.1') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) - def testSpoofActionAAAA(self): + def testSpoofActionMultiAAAA(self): """ - Spoofing: Spoof AAAA via Action + Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof - Send an AAAA query to "spoofaction.spoofing.tests.powerdns.com.", + Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.", check that dnsdist sends a spoofed result. """ - name = 'spoofaction.spoofing.tests.powerdns.com.' + name = 'multispoof.spoofing.tests.powerdns.com.' query = dns.message.make_query(name, 'AAAA', 'IN') # dnsdist set RA = RD for spoofed responses query.flags &= ~dns.flags.RD @@ -137,132 +131,137 @@ class TestSpoofingSpoof(DNSDistTest): 60, dns.rdataclass.IN, dns.rdatatype.AAAA, - '2001:DB8::1') + '2001:DB8::1', '2001:DB8::2') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) - def testSpoofActionCNAME(self): + def testSpoofActionMultiANY(self): """ - Spoofing: Spoof CNAME via Action + Spoofing: Spoof multiple addresses via AddDomainSpoof - Send an A query for "cnamespoofaction.spoofing.tests.powerdns.com.", + Send an ANY query for "multispoof.spoofing.tests.powerdns.com.", check that dnsdist sends a spoofed result. """ - name = 'cnamespoofaction.spoofing.tests.powerdns.com.' - query = dns.message.make_query(name, 'A', 'IN') + name = 'multispoof.spoofing.tests.powerdns.com.' + query = dns.message.make_query(name, 'ANY', 'IN') # dnsdist set RA = RD for spoofed responses query.flags &= ~dns.flags.RD expectedResponse = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, 60, dns.rdataclass.IN, - dns.rdatatype.CNAME, - 'cnameaction.spoofing.tests.powerdns.com.') + dns.rdatatype.A, + '192.0.2.2', '192.0.2.1') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + rrset = dns.rrset.from_text(name, + 60, + dns.rdataclass.IN, + dns.rdatatype.AAAA, + '2001:DB8::1', '2001:DB8::2') + expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) - def testSpoofActionMultiA(self): + def testSpoofActionSetAA(self): """ - Spoofing: Spoof multiple IPv4 addresses via AddDomainSpoof - - Send an A query for "multispoof.spoofing.tests.powerdns.com.", - check that dnsdist sends a spoofed result. + Spoofing: Spoof via Action, setting AA=1 """ - name = 'multispoof.spoofing.tests.powerdns.com.' - query = dns.message.make_query(name, 'A', 'IN') + name = 'spoofaction-aa.spoofing.tests.powerdns.com.' + query = dns.message.make_query(name, 'AAAA', 'IN') # dnsdist set RA = RD for spoofed responses query.flags &= ~dns.flags.RD expectedResponse = dns.message.make_response(query) + expectedResponse.flags |= dns.flags.AA rrset = dns.rrset.from_text(name, 60, dns.rdataclass.IN, - dns.rdatatype.A, - '192.0.2.2', '192.0.2.1') + dns.rdatatype.AAAA, + '2001:DB8::1') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - def testSpoofActionMultiAAAA(self): + def testSpoofActionSetAD(self): """ - Spoofing: Spoof multiple IPv6 addresses via AddDomainSpoof - - Send an AAAA query for "multispoof.spoofing.tests.powerdns.com.", - check that dnsdist sends a spoofed result. + Spoofing: Spoof via Action, setting AD=1 """ - name = 'multispoof.spoofing.tests.powerdns.com.' + name = 'spoofaction-ad.spoofing.tests.powerdns.com.' query = dns.message.make_query(name, 'AAAA', 'IN') # dnsdist set RA = RD for spoofed responses query.flags &= ~dns.flags.RD expectedResponse = dns.message.make_response(query) + expectedResponse.flags |= dns.flags.AD rrset = dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.AAAA, - '2001:DB8::1', '2001:DB8::2') + '2001:DB8::1') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) - def testSpoofActionMultiANY(self): + def testSpoofActionSetRA(self): """ - Spoofing: Spoof multiple addresses via AddDomainSpoof - - Send an ANY query for "multispoof.spoofing.tests.powerdns.com.", - check that dnsdist sends a spoofed result. + Spoofing: Spoof via Action, setting RA=1 """ - name = 'multispoof.spoofing.tests.powerdns.com.' - query = dns.message.make_query(name, 'ANY', 'IN') + name = 'spoofaction-ra.spoofing.tests.powerdns.com.' + query = dns.message.make_query(name, 'AAAA', 'IN') # dnsdist set RA = RD for spoofed responses query.flags &= ~dns.flags.RD expectedResponse = dns.message.make_response(query) - + expectedResponse.flags |= dns.flags.RA rrset = dns.rrset.from_text(name, 60, dns.rdataclass.IN, - dns.rdatatype.A, - '192.0.2.2', '192.0.2.1') + dns.rdatatype.AAAA, + '2001:DB8::1') expectedResponse.answer.append(rrset) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) + + def testSpoofActionSetNoRA(self): + """ + Spoofing: Spoof via Action, setting RA=0 + """ + name = 'spoofaction-nora.spoofing.tests.powerdns.com.' + query = dns.message.make_query(name, 'AAAA', 'IN') + expectedResponse = dns.message.make_response(query) + expectedResponse.flags &= ~dns.flags.RA rrset = dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.AAAA, - '2001:DB8::1', '2001:DB8::2') + '2001:DB8::1') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) class TestSpoofingLuaSpoof(DNSDistTest): @@ -270,7 +269,7 @@ class TestSpoofingLuaSpoof(DNSDistTest): function spoof1rule(dq) if(dq.qtype==1) -- A then - return DNSAction.Spoof, "192.0.2.1" + return DNSAction.Spoof, "192.0.2.1,192.0.2.2" elseif(dq.qtype == 28) -- AAAA then return DNSAction.Spoof, "2001:DB8::1" @@ -281,8 +280,8 @@ class TestSpoofingLuaSpoof(DNSDistTest): function spoof2rule(dq) return DNSAction.Spoof, "spoofedcname.spoofing.tests.powerdns.com." end - addLuaAction("luaspoof1.spoofing.tests.powerdns.com.", spoof1rule) - addLuaAction("luaspoof2.spoofing.tests.powerdns.com.", spoof2rule) + addAction("luaspoof1.spoofing.tests.powerdns.com.", LuaAction(spoof1rule)) + addAction("luaspoof2.spoofing.tests.powerdns.com.", LuaAction(spoof2rule)) newServer{address="127.0.0.1:%s"} """ @@ -302,16 +301,14 @@ class TestSpoofingLuaSpoof(DNSDistTest): 60, dns.rdataclass.IN, dns.rdatatype.A, - '192.0.2.1') + '192.0.2.1', '192.0.2.2') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) def testLuaSpoofAAAA(self): """ @@ -332,13 +329,11 @@ class TestSpoofingLuaSpoof(DNSDistTest): '2001:DB8::1') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) def testLuaSpoofAWithCNAME(self): """ @@ -359,13 +354,11 @@ class TestSpoofingLuaSpoof(DNSDistTest): 'spoofedcname.spoofing.tests.powerdns.com.') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) def testLuaSpoofAAAAWithCNAME(self): """ @@ -386,13 +379,11 @@ class TestSpoofingLuaSpoof(DNSDistTest): 'spoofedcname.spoofing.tests.powerdns.com.') expectedResponse.answer.append(rrset) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponse, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) class TestSpoofingLuaWithStatistics(DNSDistTest): @@ -407,7 +398,7 @@ class TestSpoofingLuaWithStatistics(DNSDistTest): return DNSAction.Spoof, "192.0.2.0" end end - addLuaAction("luaspoofwithstats.spoofing.tests.powerdns.com.", spoof1rule) + addAction("luaspoofwithstats.spoofing.tests.powerdns.com.", LuaAction(spoof1rule)) newServer{address="127.0.0.1:%s"} """ @@ -450,10 +441,8 @@ class TestSpoofingLuaWithStatistics(DNSDistTest): self.assertTrue(receivedResponse) self.assertEquals(expectedResponse2, receivedResponse) - (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponseAfterwards, receivedResponse) - - (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False) - self.assertTrue(receivedResponse) - self.assertEquals(expectedResponseAfterwards, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponseAfterwards, receivedResponse)