auto txn = d_ttsig->getRWTransaction();
for (auto range = txn.equal_range<0>(name); range.first != range.second; ++range.first) {
- if (range.first->algorithm == algorithm)
- range.first.del();
+ if (range.first->algorithm == algorithm) {
+ txn.del(range.first.getID());
+ }
}
TSIGKey tk;
TSIGKey tk;
for (auto range = txn.equal_range<0>(name); range.first != range.second; ++range.first) {
- range.first.del();
+ txn.del(range.first.getID());
}
txn.commit();
return true;
import unittest
from copy import deepcopy
from pprint import pprint
-from test_helper import ApiTestCase, unique_tsigkey_name, is_auth, is_auth_lmdb, is_recursor, get_db_tsigkeys
+from test_helper import ApiTestCase, unique_tsigkey_name, is_auth, is_recursor
class AuthTSIGHelperMixin(object):
def create_tsig_key(self, name=None, algorithm='hmac-md5', key=None):
name, payload, data = self.create_tsig_key()
r = self.session.delete(self.url("/api/v1/servers/localhost/tsigkeys/" + data['id']))
self.assertEqual(r.status_code, 204)
-
- if not is_auth_lmdb():
- keys_from_db = get_db_tsigkeys(name)
- self.assertListEqual(keys_from_db, [])
+ r = self.session.get(self.url(
+ "/api/v1/servers/localhost/tsigkeys"),
+ headers={'accept': 'application/json'})
+ self.assertEqual(r.status_code, 200)
+ keys = r.json()
+ self.assertEqual(len([key for key in keys if key['name'] == name]), 0)
def test_put_key_change_name(self):
"""
return subprocess.check_output(sdig_command_line).decode('utf-8')
except subprocess.CalledProcessError as except_inst:
raise RuntimeError("sdig %s failed: %s" % (sdig_command_line, except_inst.output.decode('ascii', errors='replace')))
-
-def get_db_tsigkeys(keyname):
- db, placeholder = get_auth_db()
- cur = db.cursor()
- cur.execute("""
- SELECT name, algorithm, secret
- FROM tsigkeys
- WHERE name = """+placeholder, (keyname, ))
- rows = cur.fetchall()
- cur.close()
- db.close()
- keys = [{'name': row[0], 'algorithm': row[1], 'secret': row[2]} for row in rows]
- print("DB TSIG keys:", keys)
- return keys