#!/usr/bin/env python
-import unittest
import dns
import clientsubnetoption
from dnsdisttests import DNSDistTest
_config_template = """
newServer{address="127.0.0.1:%s"}
truncateTC(true)
- addAction(AndRule{QTypeRule(dnsdist.ANY), TCPRule(false)}, TCAction())
- addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(dnsdist.REFUSED))
+ addAction(AndRule{QTypeRule(DNSQType.ANY), TCPRule(false)}, TCAction())
+ addAction(RegexRule("evil[0-9]{4,}\\\\.regex\\\\.tests\\\\.powerdns\\\\.com$"), RCodeAction(DNSRCode.REFUSED))
mySMN = newSuffixMatchNode()
mySMN:add(newDNSName("nameAndQtype.tests.powerdns.com."))
- addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(dnsdist.NOTIMP))
+ addAction(AndRule{SuffixMatchNodeRule(mySMN), QTypeRule("TXT")}, RCodeAction(DNSRCode.NOTIMP))
addAction(makeRule("drop.test.powerdns.com."), DropAction())
- addAction(AndRule({QTypeRule(dnsdist.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
- addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(dnsdist.REFUSED))
- addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(dnsdist.REFUSED))
+ addAction(AndRule({QTypeRule(DNSQType.A),QNameRule("ds9a.nl")}), SpoofAction("1.2.3.4"))
+ addAction(newDNSName("dnsname.addaction.powerdns.com."), RCodeAction(DNSRCode.REFUSED))
+ addAction({newDNSName("dnsname-table1.addaction.powerdns.com."), newDNSName("dnsname-table2.addaction.powerdns.com.")}, RCodeAction(DNSRCode.REFUSED))
"""
def testDropped(self):
"""
name = 'drop.test.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, None)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, None)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
def testAWithECS(self):
"""
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testSimpleA(self):
"""
'127.0.0.1')
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testAnyIsTruncated(self):
"""
"""
name = 'any.tests.powerdns.com.'
query = dns.message.make_query(name, 'ANY', 'IN')
+ # dnsdist sets RA = RD for TC responses
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.flags |= dns.flags.TC
"""
name = 'evil4242.regex.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
def testQNameReturnsSpoofed(self):
"""
'1.2.3.4')
expectedResponse.answer.append(rrset)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
-
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
def testDomainAndQTypeReturnsNotImplemented(self):
"""
"""
name = 'nameAndQtype.tests.powerdns.com.'
query = dns.message.make_query(name, 'TXT', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.NOTIMP)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
-
- (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
def testDomainWithoutQTypeIsNotAffected(self):
"""
'127.0.0.1')
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testOtherDomainANDQTypeIsNotAffected(self):
"""
'nothing to see here')
response.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- self.assertTrue(receivedResponse)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(response, receivedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
def testWrongResponse(self):
"""
'nothing to see here')
unrelatedResponse.answer.append(rrset)
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, unrelatedResponse)
- self.assertTrue(receivedQuery)
- self.assertEquals(receivedResponse, None)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, unrelatedResponse)
- self.assertTrue(receivedQuery)
- self.assertEquals(receivedResponse, None)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, unrelatedResponse)
+ self.assertTrue(receivedQuery)
+ self.assertEquals(receivedResponse, None)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
def testHeaderOnlyRefused(self):
"""
response.set_rcode(dns.rcode.REFUSED)
response.question = []
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, response)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, response)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, response)
def testHeaderOnlyNoErrorResponse(self):
"""
response = dns.message.make_response(query)
response.question = []
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, None)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, None)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, None)
def testHeaderOnlyNXDResponse(self):
"""
response.set_rcode(dns.rcode.NXDOMAIN)
response.question = []
- (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, None)
-
- (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
- self.assertTrue(receivedQuery)
- receivedQuery.id = query.id
- self.assertEquals(query, receivedQuery)
- self.assertEquals(receivedResponse, None)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, None)
def testAddActionDNSName(self):
"""
"""
name = 'dnsname.addaction.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
def testAddActionDNSNames(self):
"""
"""
for name in ['dnsname-table{}.addaction.powerdns.com.'.format(i) for i in range(1,2)]:
query = dns.message.make_query(name, 'A', 'IN')
+ query.flags &= ~dns.flags.RD
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.REFUSED)
- (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
- self.assertEquals(receivedResponse, expectedResponse)
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, expectedResponse)
-if __name__ == '__main__':
- unittest.main()
- exit(0)