_config_template = """
newServer{address="127.0.0.1:%s"}
truncateTC(true)
- addAnyTCRule()
- addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
+ addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
+ addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
mySMN = newSuffixMatchNode()
mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
- addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
+ addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
addAction(makeRule("drop.test.powerdns.com."), DropAction())
- addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
- addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
- addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
- block=newDNSName("powerdns.org.")
- function blockFilter(dq)
- if(dq.qname:isPartOf(block))
- then
- print("Blocking *.powerdns.org")
- return true
- end
- return false
- end
+ addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
+ addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
+ addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
"""
def testDropped(self):
"""
name = 'drop.test.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, None)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, None)
-
- def testBlockedA(self):
- """
- Basics: Blocked A query
-
- Send an A query for the powerdns.org domain,
- which is blocked by configuration. We expect
- no response.
- """
- name = 'blockeda.tests.powerdns.org.'
- query = dns.message.make_query(name, 'A', 'IN')
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, None)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, None)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
def testAWithECS(self):
"""
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testSimpleA(self):
"""
'127.0.0.1')
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testAnyIsTruncated(self):
"""
response.answer.append(rrset)
response.flags |= dns.flags.TC
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.flags |= dns.flags.TC
+
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(expectedResponse.flags, receivedResponse.flags)
+ self.assertEquals(expectedResponse.question, receivedResponse.question)
+ self.assertFalse(response.answer == receivedResponse.answer)
+ self.assertEquals(len(receivedResponse.answer), 0)
+ self.assertEquals(len(receivedResponse.authority), 0)
+ self.assertEquals(len(receivedResponse.additional), 0)
+ self.checkMessageNoEDNS(expectedResponse, receivedResponse)
+
+ def testTruncateTCEDNS(self):
+ """
+ Basics: Truncate TC with EDNS
+
+ dnsdist is configured to truncate TC (default),
+ we make the backend send responses
+ with TC set and additional content,
+ and check that the received response has been fixed.
+ Note that the query and initial response had EDNS,
+ so the final response should have it too.
+ """
+ name = 'atruncatetc.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, want_dnssec=True)
+ response = dns.message.make_response(query)
+ # force a different responder payload than the one in the query,
+ # so we check that we don't just mirror it
+ response.payload = 4242
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+
+ response.answer.append(rrset)
+ response.flags |= dns.flags.TC
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.payload = 4242
+ expectedResponse.flags |= dns.flags.TC
(receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
receivedQuery.id = query.id
self.assertEquals(len(receivedResponse.answer), 0)
self.assertEquals(len(receivedResponse.authority), 0)
self.assertEquals(len(receivedResponse.additional), 0)
+ print(expectedResponse)
+ print(receivedResponse)
+ self.checkMessageEDNSWithoutOptions(expectedResponse, receivedResponse)
+ self.assertFalse(receivedResponse.ednsflags & dns.flags.DO)
+ self.assertEquals(receivedResponse.payload, 4242)
def testRegexReturnsRefused(self):
"""
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
def testQNameReturnsSpoofed(self):
"""
'1.2.3.4')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
-
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
def testDomainAndQTypeReturnsNotImplemented(self):
"""
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.NOTIMP)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
def testDomainWithoutQTypeIsNotAffected(self):
"""
'127.0.0.1')
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testOtherDomainANDQTypeIsNotAffected(self):
"""
'nothing to see here')
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testWrongResponse(self):
"""
'nothing to see here')
unrelatedResponse.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, unrelatedResponse)
- self.assertTrue(receivedQuery)
- self.assertEquals(receivedResponse, None)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, unrelatedResponse)
- self.assertTrue(receivedQuery)
- self.assertEquals(receivedResponse, None)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, unrelatedResponse)
+ self.assertTrue(receivedQuery)
+ self.assertEquals(receivedResponse, None)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
def testHeaderOnlyRefused(self):
"""
response.set_rcode(dns.rcode.REFUSED)
response.question = []
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, response)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, response)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
def testHeaderOnlyNoErrorResponse(self):
"""
response = dns.message.make_response(query)
response.question = []
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, None)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, None)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, None)
def testHeaderOnlyNXDResponse(self):
"""
response.set_rcode(dns.rcode.NXDOMAIN)
response.question = []
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, None)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, None)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, None)
def testAddActionDNSName(self):
"""
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
def testAddActionDNSNames(self):
"""
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
-if __name__ == '__main__':
- unittest.main()
- exit(0)