From: Remi Gacogne Date: Mon, 18 Mar 2019 14:07:34 +0000 (+0100) Subject: dnsdist: Add regression tests for clearRules() and setRules() X-Git-Tag: dnsdist-1.4.0-alpha1~62^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=refs%2Fpull%2F7594%2Fhead;p=thirdparty%2Fpdns.git dnsdist: Add regression tests for clearRules() and setRules() --- diff --git a/regression-tests.dnsdist/test_Advanced.py b/regression-tests.dnsdist/test_Advanced.py index 04adcc0e06..ea943bb00c 100644 --- a/regression-tests.dnsdist/test_Advanced.py +++ b/regression-tests.dnsdist/test_Advanced.py @@ -1831,3 +1831,70 @@ class TestAdvancedEDNSVersionnRule(DNSDistTest): receivedQuery.id = query.id self.assertEquals(query, receivedQuery) self.assertEquals(receivedResponse, response) + +class TestSetRules(DNSDistTest): + + _consoleKey = DNSDistTest.generateConsoleKey() + _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') + _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort'] + _config_template = """ + setKey("%s") + controlSocket("127.0.0.1:%s") + newServer{address="127.0.0.1:%s"} + addAction(AllRule(), SpoofAction("192.0.2.1")) + """ + + def testClearThenSetRules(self): + """ + Advanced: Clear rules, set rules + + """ + name = 'clearthensetrules.advanced.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + # dnsdist set RA = RD for spoofed responses + query.flags &= ~dns.flags.RD + expectedResponse = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 60, + dns.rdataclass.IN, + dns.rdatatype.A, + '192.0.2.1') + expectedResponse.answer.append(rrset) + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) + + # clear all the rules, we should not be spoofing and get a SERVFAIL from the responder instead + self.sendConsoleCommand("clearRules()") + + expectedResponse = dns.message.make_response(query) + expectedResponse.set_rcode(dns.rcode.SERVFAIL) + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse) + + # insert a new spoofing rule + self.sendConsoleCommand("setRules({ newRuleAction(AllRule(), SpoofAction(\"192.0.2.2\")) })") + + expectedResponse = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 60, + dns.rdataclass.IN, + dns.rdatatype.A, + '192.0.2.2') + expectedResponse.answer.append(rrset) + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + + (_, receivedResponse) = sender(query, response=None, useQueue=False) + self.assertTrue(receivedResponse) + self.assertEquals(expectedResponse, receivedResponse)