]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.api/test_helper.py
Make sure we can install unsigned packages.
[thirdparty/pdns.git] / regression-tests.api / test_helper.py
CommitLineData
fa39af54 1from __future__ import print_function
e2dba705
CH
2from datetime import datetime
3import os
1a152698 4import requests
1a152698 5import unittest
1d6b70f9 6import sqlite3
7cbc5255 7import subprocess
541bb91b
CH
8import sys
9
10if sys.version_info[0] == 2:
11 from urlparse import urljoin
12else:
13 from urllib.parse import urljoin
14
7c876c30 15DAEMON = os.environ.get('DAEMON', 'authoritative')
7cbc5255 16PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
1d6b70f9 17SQLITE_DB = os.environ.get('SQLITE_DB', 'pdns.sqlite3')
d19c22a1
CHB
18SDIG = os.environ.get('SDIG', 'sdig')
19DNSPORT = os.environ.get('DNSPORT', '53')
1a152698
CH
20
21class ApiTestCase(unittest.TestCase):
22
23 def setUp(self):
24 # TODO: config
825fa717 25 self.server_address = '127.0.0.1'
8b4030fb 26 self.webServerBasicAuthPassword = 'something'
825fa717
CH
27 self.server_port = int(os.environ.get('WEBPORT', '5580'))
28 self.server_url = 'http://%s:%s/' % (self.server_address, self.server_port)
c563cbe5 29 self.server_web_password = os.environ.get('WEBPASSWORD', 'MISSING')
1a152698 30 self.session = requests.Session()
7f7481be 31 self.session.headers = {'X-API-Key': os.environ.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self.server_address, self.server_port)}
1a152698
CH
32
33 def url(self, relative_url):
541bb91b 34 return urljoin(self.server_url, relative_url)
1a152698 35
c1374bdb 36 def assert_success_json(self, result):
fa75c441
CH
37 try:
38 result.raise_for_status()
39 except:
541bb91b 40 print(result.content)
fa75c441 41 raise
1a152698 42 self.assertEquals(result.headers['Content-Type'], 'application/json')
e2dba705 43
ad918272
BZ
44 def assert_error_json(self, result):
45 self.assertTrue(400 <= result.status_code < 600, "Response has not an error code "+str(result.status_code))
46 self.assertEquals(result.headers['Content-Type'], 'application/json', "Response status code "+str(result.status_code))
47
f0e76cee
CH
48 def assert_success(self, result):
49 try:
50 result.raise_for_status()
51 except:
541bb91b 52 print(result.content)
f0e76cee
CH
53 raise
54
e2dba705
CH
55
56def unique_zone_name():
1d6b70f9 57 return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '.org.'
7c876c30 58
6eca6510
PL
59def unique_tsigkey_name():
60 return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '-key'
7c876c30 61
c1374bdb
CH
62def is_auth():
63 return DAEMON == 'authoritative'
7c876c30
CH
64
65
c1374bdb
CH
66def is_recursor():
67 return DAEMON == 'recursor'
1d6b70f9
CH
68
69
1d6b70f9
CH
70def get_auth_db():
71 """Return Connection to Authoritative backend DB."""
72 return sqlite3.Connection(SQLITE_DB)
73
74
75def get_db_records(zonename, qtype):
76 with get_auth_db() as db:
77 rows = db.execute("""
b8cd24cc 78 SELECT name, type, content, ttl, ordername
1d6b70f9
CH
79 FROM records
80 WHERE type = ? AND domain_id = (
81 SELECT id FROM domains WHERE name = ?
82 )""", (qtype, zonename.rstrip('.'))).fetchall()
b8cd24cc 83 recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3], 'ordername': row[4]} for row in rows]
541bb91b 84 print("DB Records:", recs)
1d6b70f9 85 return recs
7cbc5255
CH
86
87
541bb91b
CH
88def pdnsutil(subcommand, *args):
89 try:
90 return subprocess.check_output(PDNSUTIL_CMD + [subcommand] + list(args), close_fds=True).decode('ascii')
91 except subprocess.CalledProcessError as except_inst:
7a0ea291 92 raise RuntimeError("pdnsutil %s %s failed: %s" % (subcommand, args, except_inst.output.decode('ascii', errors='replace')))
541bb91b 93
7cbc5255
CH
94def pdnsutil_rectify(zonename):
95 """Run pdnsutil rectify-zone on the given zone."""
541bb91b 96 pdnsutil('rectify-zone', zonename)
d19c22a1
CHB
97
98def sdig(*args):
99 try:
100 return subprocess.check_call([SDIG, '127.0.0.1', str(DNSPORT)] + list(args))
101 except subprocess.CalledProcessError as except_inst:
102 raise RuntimeError("sdig %s %s failed: %s" % (command, args, except_inst.output.decode('ascii', errors='replace')))
6eca6510
PL
103
104def get_db_tsigkeys(keyname):
105 with get_auth_db() as db:
106 rows = db.execute("""
107 SELECT name, algorithm, secret
108 FROM tsigkeys
109 WHERE name = ?""", (keyname, )).fetchall()
110 keys = [{'name': row[0], 'algorithm': row[1], 'secret': row[2]} for row in rows]
fa39af54 111 print("DB TSIG keys:", keys)
6eca6510
PL
112 return keys
113