sys.exit(2)
print("Query for example.com/A to create statistic data...")
-run_check_call([sdig, "127.0.0.1", str(DNSPORT), "example.com", "A"])
+if daemon == 'authoritative':
+ run_check_call([sdig, "127.0.0.1", str(DNSPORT), "example.com", "A"])
+else:
+ run_check_call([sdig, "127.0.0.1", str(DNSPORT), "example.com", "A", "recurse"])
print("Running tests...")
returncode = 0
self.assert_success_json(r)
data = r.json()
self.assertIn('count', data)
- self.assertEqual(1, data['count'])
+ self.assertEqual(2, data['count'])
@unittest.skipIf(not is_recursor(), "Not applicable")
def test_flush_subtree(self):
self.assert_success_json(r)
data = r.json()
self.assertIn('count', data)
- self.assertEqual(1, data['count'])
+ self.assertEqual(3, data['count'])
r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=example.com.&subtree=true"))
self.assert_success_json(r)
data = r.json()
self.assertIn('count', data)
- self.assertEqual(2, data['count'])
+ self.assertEqual(4, data['count'])
def test_flush_root(self):
r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=."))
pdnsutil('rectify-zone', zonename)
def sdig(*args):
- sdig_command_line = [SDIG, '127.0.0.1', str(DNSPORT)] + list(args)
+ if is_auth():
+ sdig_command_line = [SDIG, '127.0.0.1', str(DNSPORT)] + list(args)
+ else:
+ sdig_command_line = [SDIG, '127.0.0.1', str(DNSPORT)] + list(args) + ["recurse"]
try:
return subprocess.check_output(sdig_command_line).decode('utf-8')
except subprocess.CalledProcessError as except_inst: