]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_helper.py
Merge pull request #7853 from Marlinc/dnsdist-prometheus-histogram
[thirdparty/pdns.git] / regression-tests.api / test_helper.py
1 from __future__ import print_function
2 from datetime import datetime
3 import os
4 import requests
5 import unittest
6 import sqlite3
7 import subprocess
8 import sys
9
10 if sys.version_info[0] == 2:
11 from urlparse import urljoin
12 else:
13 from urllib.parse import urljoin
14
15 DAEMON = os.environ.get('DAEMON', 'authoritative')
16 PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
17 SQLITE_DB = os.environ.get('SQLITE_DB', 'pdns.sqlite3')
18 SDIG = os.environ.get('SDIG', 'sdig')
19 DNSPORT = os.environ.get('DNSPORT', '53')
20
21 class ApiTestCase(unittest.TestCase):
22
23 def setUp(self):
24 # TODO: config
25 self.server_address = '127.0.0.1'
26 self.server_port = int(os.environ.get('WEBPORT', '5580'))
27 self.server_url = 'http://%s:%s/' % (self.server_address, self.server_port)
28 self.server_web_password = os.environ.get('WEBPASSWORD', 'MISSING')
29 self.session = requests.Session()
30 self.session.headers = {'X-API-Key': os.environ.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self.server_address, self.server_port)}
31
32 def url(self, relative_url):
33 return urljoin(self.server_url, relative_url)
34
35 def assert_success_json(self, result):
36 try:
37 result.raise_for_status()
38 except:
39 print(result.content)
40 raise
41 self.assertEquals(result.headers['Content-Type'], 'application/json')
42
43 def assert_error_json(self, result):
44 self.assertTrue(400 <= result.status_code < 600, "Response has not an error code "+str(result.status_code))
45 self.assertEquals(result.headers['Content-Type'], 'application/json', "Response status code "+str(result.status_code))
46
47 def assert_success(self, result):
48 try:
49 result.raise_for_status()
50 except:
51 print(result.content)
52 raise
53
54
55 def unique_zone_name():
56 return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '.org.'
57
58 def unique_tsigkey_name():
59 return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '-key'
60
61 def is_auth():
62 return DAEMON == 'authoritative'
63
64
65 def is_recursor():
66 return DAEMON == 'recursor'
67
68
69 def get_auth_db():
70 """Return Connection to Authoritative backend DB."""
71 return sqlite3.Connection(SQLITE_DB)
72
73
74 def get_db_records(zonename, qtype):
75 with get_auth_db() as db:
76 rows = db.execute("""
77 SELECT name, type, content, ttl, ordername
78 FROM records
79 WHERE type = ? AND domain_id = (
80 SELECT id FROM domains WHERE name = ?
81 )""", (qtype, zonename.rstrip('.'))).fetchall()
82 recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3], 'ordername': row[4]} for row in rows]
83 print("DB Records:", recs)
84 return recs
85
86
87 def pdnsutil(subcommand, *args):
88 try:
89 return subprocess.check_output(PDNSUTIL_CMD + [subcommand] + list(args), close_fds=True).decode('ascii')
90 except subprocess.CalledProcessError as except_inst:
91 raise RuntimeError("pdnsutil %s %s failed: %s" % (subcommand, args, except_inst.output.decode('ascii', errors='replace')))
92
93 def pdnsutil_rectify(zonename):
94 """Run pdnsutil rectify-zone on the given zone."""
95 pdnsutil('rectify-zone', zonename)
96
97 def sdig(*args):
98 try:
99 return subprocess.check_call([SDIG, '127.0.0.1', str(DNSPORT)] + list(args))
100 except subprocess.CalledProcessError as except_inst:
101 raise RuntimeError("sdig %s %s failed: %s" % (command, args, except_inst.output.decode('ascii', errors='replace')))
102
103 def get_db_tsigkeys(keyname):
104 with get_auth_db() as db:
105 rows = db.execute("""
106 SELECT name, algorithm, secret
107 FROM tsigkeys
108 WHERE name = ?""", (keyname, )).fetchall()
109 keys = [{'name': row[0], 'algorithm': row[1], 'secret': row[2]} for row in rows]
110 print("DB TSIG keys:", keys)
111 return keys
112