receivedQuery.id = query.id
self.assertEquals(query, receivedQuery)
self.assertEquals(receivedResponse, response)
+
+class TestAdvancedEDNSVersionRule(DNSDistTest):
+
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ addAction(EDNSVersionRule(0), ERCodeAction(dnsdist.BADVERS))
+ """
+
+ def testDropped(self):
+ """
+ Advanced: A question with ECS version larger than 0 is dropped
+ """
+
+ name = 'ednsversionrule.advanced.tests.powerdns.com.'
+
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=1)
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.BADVERS)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None)
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ def testNoEDNS0Pass(self):
+ """
+ Advanced: A question with ECS version 0 goes through
+ """
+
+ name = 'ednsversionrule.advanced.tests.powerdns.com.'
+
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True)
+ response = dns.message.make_response(query)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+ def testReplied(self):
+ """
+ Advanced: A question without ECS goes through
+ """
+
+ name = 'ednsoptionrule.advanced.tests.powerdns.com.'
+
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ response = dns.message.make_response(query)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
+
+class TestSetRules(DNSDistTest):
+
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort']
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+ newServer{address="127.0.0.1:%s"}
+ addAction(AllRule(), SpoofAction("192.0.2.1"))
+ """
+
+ def testClearThenSetRules(self):
+ """
+ Advanced: Clear rules, set rules
+
+ """
+ name = 'clearthensetrules.advanced.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist set RA = RD for spoofed responses
+ query.flags &= ~dns.flags.RD
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ expectedResponse.answer.append(rrset)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ # clear all the rules, we should not be spoofing and get a SERVFAIL from the responder instead
+ self.sendConsoleCommand("clearRules()")
+
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(dns.rcode.SERVFAIL)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)
+
+ # insert a new spoofing rule
+ self.sendConsoleCommand("setRules({ newRuleAction(AllRule(), SpoofAction(\"192.0.2.2\")) })")
+
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.2')
+ expectedResponse.answer.append(rrset)
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertTrue(receivedResponse)
+ self.assertEquals(expectedResponse, receivedResponse)