+from __future__ import print_function
from datetime import datetime
import os
import requests
self.server_address = '127.0.0.1'
self.server_port = int(os.environ.get('WEBPORT', '5580'))
self.server_url = 'http://%s:%s/' % (self.server_address, self.server_port)
+ self.server_web_password = os.environ.get('WEBPASSWORD', 'MISSING')
self.session = requests.Session()
self.session.headers = {'X-API-Key': os.environ.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self.server_address, self.server_port)}
def unique_zone_name():
return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '.org.'
+def unique_tsigkey_name():
+ return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '-key'
def is_auth():
return DAEMON == 'authoritative'
def get_db_records(zonename, qtype):
with get_auth_db() as db:
rows = db.execute("""
- SELECT name, type, content, ttl
+ SELECT name, type, content, ttl, ordername
FROM records
WHERE type = ? AND domain_id = (
SELECT id FROM domains WHERE name = ?
)""", (qtype, zonename.rstrip('.'))).fetchall()
- recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3]} for row in rows]
+ recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3], 'ordername': row[4]} for row in rows]
print("DB Records:", recs)
return recs
return subprocess.check_call([SDIG, '127.0.0.1', str(DNSPORT)] + list(args))
except subprocess.CalledProcessError as except_inst:
raise RuntimeError("sdig %s %s failed: %s" % (command, args, except_inst.output.decode('ascii', errors='replace')))
+
+def get_db_tsigkeys(keyname):
+ with get_auth_db() as db:
+ rows = db.execute("""
+ SELECT name, algorithm, secret
+ FROM tsigkeys
+ WHERE name = ?""", (keyname, )).fetchall()
+ keys = [{'name': row[0], 'algorithm': row[1], 'secret': row[2]} for row in rows]
+ print("DB TSIG keys:", keys)
+ return keys
+