import requests
import time
import dns
-from dnsdisttests import DNSDistTest, range
+from dnsdisttests import DNSDistTest
+try:
+ range = xrange
+except NameError:
+ pass
class DynBlocksTest(DNSDistTest):
name = 'qrateactionrefused.dynblocks.tests.powerdns.com.'
self.doTestQRateRCode(name, dns.rcode.REFUSED)
+class TestDynBlockQPSActionNXD(DynBlocksTest):
+
+ _dynBlockQPS = 10
+ _dynBlockPeriod = 2
+ _dynBlockDuration = 5
+ _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort']
+ _config_template = """
+ function maintenance()
+ addDynBlocks(exceedQRate(%d, %d), "Exceeded query rate", %d, DNSAction.Nxdomain)
+ end
+ setDynBlocksAction(DNSAction.Drop)
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testDynBlocksQRate(self):
+ """
+ Dyn Blocks: QRate NXD (action)
+ """
+ name = 'qrateactionnxd.dynblocks.tests.powerdns.com.'
+ self.doTestQRateRCode(name, dns.rcode.NXDOMAIN)
+
class TestDynBlockGroupQPSActionRefused(DynBlocksTest):
_dynBlockQPS = 10
_config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort']
_config_template = """
local dbr = dynBlockRulesGroup()
- dbr:setRCodeRate(dnsdist.SERVFAIL, %d, %d, "Exceeded query rate", %d)
+ dbr:setRCodeRate(DNSRCode.SERVFAIL, %d, %d, "Exceeded query rate", %d)
function maintenance()
dbr:apply()
# check that the rule has been inserted
self.doTestDynBlockViaAPI('127.0.0.1/32', 'Exceeded query rate', self._dynBlockDuration - 4, self._dynBlockDuration, 0, sent)
+
+class TestDynBlockGroupWarning(DynBlocksTest):
+
+ _dynBlockWarningQPS = 5
+ _dynBlockQPS = 20
+ _dynBlockPeriod = 2
+ _dynBlockDuration = 5
+ _config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_dynBlockWarningQPS', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
+ _config_template = """
+ local dbr = dynBlockRulesGroup()
+ dbr:setQueryRate(%d, %d, "Exceeded query rate", %d, DNSAction.Drop, %d)
+
+ function maintenance()
+ dbr:apply()
+ end
+
+ newServer{address="127.0.0.1:%s"}
+ webserver("127.0.0.1:%s", "%s", "%s")
+ """
+
+ def testWarning(self):
+ """
+ Dyn Blocks (group) : Warning
+ """
+ name = 'warning.group.dynblocks.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+
+ allowed = 0
+ sent = 0
+ for _ in range((self._dynBlockWarningQPS * self._dynBlockPeriod) + 1):
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ sent = sent + 1
+ if receivedQuery:
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+ allowed = allowed + 1
+ else:
+ # the query has not reached the responder,
+ # let's clear the response queue
+ self.clearToResponderQueue()
+
+ # a dynamic rule should have been inserted, but the queries should
+ # still go on because we are still at warning level
+ self.assertEqual(allowed, sent)
+
+ # wait for the maintenance function to run
+ time.sleep(2)
+
+ # the rule should still be present, but the queries pass through anyway
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(receivedResponse, receivedResponse)
+
+ # check that the rule has been inserted
+ self.doTestDynBlockViaAPI('127.0.0.1/32', 'Exceeded query rate', self._dynBlockDuration - 4, self._dynBlockDuration, 0, sent)
+
+ self.doTestQRate(name)