_dynBlockDuration = _maintenanceWaitTime + 2
_config_params = ['_dynBlockQPS', '_dynBlockPeriod', '_dynBlockDuration', '_testServerPort']
- def doTestDynBlockViaAPI(self, ipRange, reason, minSeconds, maxSeconds, minBlocks, maxBlocks):
+ def doTestDynBlockViaAPI(self, ipRange, reason, minSeconds, maxSeconds, minBlocks, maxBlocks, ebpf=False):
headers = {'x-api-key': self._webServerAPIKey}
url = 'http://127.0.0.1:' + str(self._webServerPort) + '/jsonstat?command=dynblocklist'
r = requests.get(url, headers=headers, timeout=self._webTimeout)
self.assertIn(ipRange, content)
values = content[ipRange]
- for key in ['reason', 'seconds', 'blocks', 'action']:
+ for key in ['reason', 'seconds', 'blocks', 'action', 'ebpf']:
self.assertIn(key, values)
self.assertEqual(values['reason'], reason)
self.assertLessEqual(values['seconds'], maxSeconds)
self.assertGreaterEqual(values['blocks'], minBlocks)
self.assertLessEqual(values['blocks'], maxBlocks)
+ self.assertEqual(values['ebpf'], True if ebpf else False)
- def doTestQRate(self, name, testViaAPI=True):
+ def doTestQRate(self, name, testViaAPI=True, ebpf=False):
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
self.assertEqual(receivedResponse, None)
if testViaAPI:
- self.doTestDynBlockViaAPI('127.0.0.1/32', 'Exceeded query rate', 1, self._dynBlockDuration, (sent-allowed)+1, (sent-allowed)+1)
+ self.doTestDynBlockViaAPI('127.0.0.1/32', 'Exceeded query rate', 1, self._dynBlockDuration, (sent-allowed)+1, (sent-allowed)+1, ebpf)
# wait until we are not blocked anymore
time.sleep(self._dynBlockDuration + self._dynBlockPeriod)