:query server_id: The name of the server
:query domain: The domainname to flush for
+ :query subtree: If set to `true`, also flush the whole subtree (default=`false`)
**Example Response:**
throw HttpMethodNotAllowedException();
DNSName canon = apiNameToDNSName(req->getvars["domain"]);
+ bool subtree = (req->getvars["subtree"].compare("true") == 0);
- int count = broadcastAccFunction<uint64_t>(boost::bind(pleaseWipeCache, canon, false));
- count += broadcastAccFunction<uint64_t>(boost::bind(pleaseWipePacketCache, canon, false));
- count += broadcastAccFunction<uint64_t>(boost::bind(pleaseWipeAndCountNegCache, canon, false));
+ int count = broadcastAccFunction<uint64_t>(boost::bind(pleaseWipeCache, canon, subtree));
+ count += broadcastAccFunction<uint64_t>(boost::bind(pleaseWipePacketCache, canon, subtree));
+ count += broadcastAccFunction<uint64_t>(boost::bind(pleaseWipeAndCountNegCache, canon, subtree));
resp->setBody(Json::object {
{ "count", count },
{ "result", "Flushed cache." }
'DAEMON': daemon,
'SQLITE_DB': SQLITE_DB,
'PDNSUTIL_CMD': ' '.join(PDNSUTIL_CMD),
+ 'SDIG': sdig,
+ 'DNSPORT': str(DNSPORT)
})
try:
-from test_helper import ApiTestCase, is_auth, is_recursor
+from test_helper import ApiTestCase, is_auth, is_recursor, sdig
class Servers(ApiTestCase):
data = r.json()
self.assertIn('count', data)
+ def test_flush_count(self):
+ sdig("ns1.example.com", 'A')
+ r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=ns1.example.com."))
+ self.assert_success_json(r)
+ data = r.json()
+ self.assertIn('count', data)
+ self.assertEquals(1, data['count'])
+
+ def test_flush_subtree(self):
+ sdig("ns1.example.com", 'A')
+ sdig("ns2.example.com", 'A')
+ r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=example.com.&subtree=false"))
+ self.assert_success_json(r)
+ data = r.json()
+ self.assertIn('count', data)
+ self.assertEquals(1, data['count'])
+ r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=example.com.&subtree=true"))
+ self.assert_success_json(r)
+ data = r.json()
+ self.assertIn('count', data)
+ self.assertEquals(2, data['count'])
+
def test_flush_root(self):
r = self.session.put(self.url("/api/v1/servers/localhost/cache/flush?domain=."))
self.assert_success_json(r)
else:
from urllib.parse import urljoin
-
DAEMON = os.environ.get('DAEMON', 'authoritative')
PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
SQLITE_DB = os.environ.get('SQLITE_DB', 'pdns.sqlite3')
-
+SDIG = os.environ.get('SDIG', 'sdig')
+DNSPORT = os.environ.get('DNSPORT', '53')
class ApiTestCase(unittest.TestCase):
except subprocess.CalledProcessError as except_inst:
raise RuntimeError("pdnsutil %s %s failed: %s" % (command, args, except_inst.output.decode('ascii', errors='replace')))
-
def pdnsutil_rectify(zonename):
"""Run pdnsutil rectify-zone on the given zone."""
pdnsutil('rectify-zone', zonename)
+
+def sdig(*args):
+ try:
+ return subprocess.check_call([SDIG, '127.0.0.1', str(DNSPORT)] + list(args))
+ except subprocess.CalledProcessError as except_inst:
+ raise RuntimeError("sdig %s %s failed: %s" % (command, args, except_inst.output.decode('ascii', errors='replace')))