self.assertEquals(query, receivedQuery)
self.assertEquals(response, receivedResponse)
+ def doTestRCodeRatio(self, name, rcode, noerrorcount, rcodecount):
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '192.0.2.1')
+ response.answer.append(rrset)
+ expectedResponse = dns.message.make_response(query)
+ expectedResponse.set_rcode(rcode)
+
+ # start with normal responses
+ for _ in range(noerrorcount-1):
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # wait for the maintenance function to run
+ time.sleep(2)
+
+ # we should NOT be dropped!
+ (_, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertEquals(receivedResponse, response)
+
+ # now with rcode!
+ sent = 0
+ allowed = 0
+ for _ in range(rcodecount):
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, expectedResponse)
+ sent = sent + 1
+ if receivedQuery:
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(expectedResponse, receivedResponse)
+ allowed = allowed + 1
+ else:
+ # the query has not reached the responder,
+ # let's clear the response queue
+ self.clearToResponderQueue()
+
+ # we should have been able to send all our queries since the minimum number of queries is set to noerrorcount + rcodecount
+ self.assertGreaterEqual(allowed, rcodecount)
+
+ # wait for the maintenance function to run
+ time.sleep(2)
+
+ # we should now be dropped for up to self._dynBlockDuration + self._dynBlockPeriod
+ (_, receivedResponse) = self.sendUDPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
+ # wait until we are not blocked anymore
+ time.sleep(self._dynBlockDuration + self._dynBlockPeriod)
+
+ # this one should succeed
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # again, over TCP this time
+ # start with normal responses
+ for _ in range(noerrorcount-1):
+ (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # wait for the maintenance function to run
+ time.sleep(2)
+
+ # we should NOT be dropped!
+ (_, receivedResponse) = self.sendUDPQuery(query, response)
+ self.assertEquals(receivedResponse, response)
+
+ # now with rcode!
+ sent = 0
+ allowed = 0
+ for _ in range(rcodecount):
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, expectedResponse)
+ sent = sent + 1
+ if receivedQuery:
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(expectedResponse, receivedResponse)
+ allowed = allowed + 1
+ else:
+ # the query has not reached the responder,
+ # let's clear the response queue
+ self.clearToResponderQueue()
+
+ # we should have been able to send all our queries since the minimum number of queries is set to noerrorcount + rcodecount
+ self.assertGreaterEqual(allowed, rcodecount)
+
+ # wait for the maintenance function to run
+ time.sleep(2)
+
+ # we should now be dropped for up to self._dynBlockDuration + self._dynBlockPeriod
+ (_, receivedResponse) = self.sendTCPQuery(query, response=None, useQueue=False)
+ self.assertEquals(receivedResponse, None)
+
+ # wait until we are not blocked anymore
+ time.sleep(self._dynBlockDuration + self._dynBlockPeriod)
+
+ # this one should succeed
+ (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
class TestDynBlockQPS(DynBlocksTest):
_dynBlockQPS = 10
"""
name = 'qrateactiontruncated.dynblocks.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
+ # dnsdist sets RA = RD for TC responses
+ query.flags &= ~dns.flags.RD
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
60,
name = 'servfailrate.group.dynblocks.tests.powerdns.com.'
self.doTestRCodeRate(name, dns.rcode.SERVFAIL)
+class TestDynBlockGroupServFailsRatio(DynBlocksTest):
+
+ _dynBlockPeriod = 2
+ _dynBlockDuration = 5
+ _config_params = ['_dynBlockPeriod', '_dynBlockDuration', '_testServerPort']
+ _config_template = """
+ local dbr = dynBlockRulesGroup()
+ dbr:setRCodeRatio(DNSRCode.SERVFAIL, 0.2, %d, "Exceeded query rate", %d, 20)
+
+ function maintenance()
+ dbr:apply()
+ end
+
+ newServer{address="127.0.0.1:%s"}
+ """
+
+ def testDynBlocksServFailRatio(self):
+ """
+ Dyn Blocks (group): Server Failure Ratio
+ """
+ name = 'servfailratio.group.dynblocks.tests.powerdns.com.'
+ self.doTestRCodeRatio(name, dns.rcode.SERVFAIL, 10, 10)
+
class TestDynBlockResponseBytes(DynBlocksTest):
_dynBlockBytesPerSecond = 200