]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_helper.py
Merge pull request #11431 from jroessler-ox/docs-kskzskroll-update
[thirdparty/pdns.git] / regression-tests.api / test_helper.py
1 from __future__ import print_function
2 from datetime import datetime
3 import os
4 import requests
5 import unittest
6 import mysql.connector
7 import psycopg2
8 import sqlite3
9 import subprocess
10 import sys
11
12 if sys.version_info[0] == 2:
13 from urlparse import urljoin
14 else:
15 from urllib.parse import urljoin
16
17 DAEMON = os.environ.get('DAEMON', 'authoritative')
18 PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
19 BACKEND = os.environ.get('BACKEND', 'gsqlite3')
20 MYSQL_DB = os.environ.get('MYSQL_DB', 'pdnsapi')
21 MYSQL_USER = os.environ.get('MYSQL_USER', 'root')
22 MYSQL_HOST = os.environ.get('MYSQL_HOST', 'localhost')
23 MYSQL_PASSWD = os.environ.get('MYSQL_PASWORD', '')
24 PGSQL_DB = os.environ.get('PGSQL_DB', 'pdnsapi')
25 SQLITE_DB = os.environ.get('SQLITE_DB', 'pdns.sqlite3')
26 LMDB_DB = os.environ.get('SQLITE_DB', 'pdns.lmdb')
27 SDIG = os.environ.get('SDIG', 'sdig')
28 DNSPORT = os.environ.get('DNSPORT', '53')
29
30 class ApiTestCase(unittest.TestCase):
31
32 def setUp(self):
33 # TODO: config
34 self.server_address = '127.0.0.1'
35 self.webServerBasicAuthPassword = 'something'
36 self.server_port = int(os.environ.get('WEBPORT', '5580'))
37 self.server_url = 'http://%s:%s/' % (self.server_address, self.server_port)
38 self.server_web_password = os.environ.get('WEBPASSWORD', 'MISSING')
39 self.session = requests.Session()
40 self.session.headers = {'X-API-Key': os.environ.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self.server_address, self.server_port)}
41
42 def url(self, relative_url):
43 return urljoin(self.server_url, relative_url)
44
45 def assert_success_json(self, result):
46 try:
47 result.raise_for_status()
48 except:
49 print(result.content)
50 raise
51 self.assertEqual(result.headers['Content-Type'], 'application/json')
52
53 def assert_error_json(self, result):
54 self.assertTrue(400 <= result.status_code < 600, "Response has not an error code "+str(result.status_code))
55 self.assertEqual(result.headers['Content-Type'], 'application/json', "Response status code "+str(result.status_code))
56
57 def assert_success(self, result):
58 try:
59 result.raise_for_status()
60 except:
61 print(result.content)
62 raise
63
64
65 def unique_zone_name():
66 return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '.org.'
67
68 def unique_tsigkey_name():
69 return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '-key'
70
71 def is_auth():
72 return DAEMON == 'authoritative'
73
74 def is_auth_lmdb():
75 return DAEMON == 'authoritative' and BACKEND == 'lmdb'
76
77 def is_recursor():
78 return DAEMON == 'recursor'
79
80
81 def get_auth_db():
82 """Return Connection to Authoritative backend DB."""
83 if BACKEND == 'gmysql':
84 return mysql.connector.connect(database=MYSQL_DB, user=MYSQL_USER, host=MYSQL_HOST, password=MYSQL_PASSWD), "%s"
85 elif BACKEND == 'gpgsql':
86 return psycopg2.connect(database=PGSQL_DB), "%s"
87 else:
88 return sqlite3.Connection(SQLITE_DB), "?"
89
90
91 def get_db_records(zonename, qtype):
92 db, placeholder = get_auth_db()
93 cur = db.cursor()
94 cur.execute("""
95 SELECT name, type, content, ttl, ordername
96 FROM records
97 WHERE type = """+placeholder+""" AND domain_id = (
98 SELECT id FROM domains WHERE name = """+placeholder+"""
99 )""", (qtype, zonename.rstrip('.')))
100 rows = cur.fetchall()
101 cur.close()
102 db.close()
103 recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3], 'ordername': row[4]} for row in rows]
104 print("DB Records:", recs)
105 return recs
106
107
108 def pdnsutil(subcommand, *args):
109 try:
110 return subprocess.check_output(PDNSUTIL_CMD + [subcommand] + list(args), close_fds=True).decode('ascii')
111 except subprocess.CalledProcessError as except_inst:
112 raise RuntimeError("pdnsutil %s %s failed: %s" % (subcommand, args, except_inst.output.decode('ascii', errors='replace')))
113
114 def pdnsutil_rectify(zonename):
115 """Run pdnsutil rectify-zone on the given zone."""
116 pdnsutil('rectify-zone', zonename)
117
118 def sdig(*args):
119 if is_auth():
120 sdig_command_line = [SDIG, '127.0.0.1', str(DNSPORT)] + list(args)
121 else:
122 sdig_command_line = [SDIG, '127.0.0.1', str(DNSPORT)] + list(args) + ["recurse"]
123 try:
124 return subprocess.check_output(sdig_command_line).decode('utf-8')
125 except subprocess.CalledProcessError as except_inst:
126 raise RuntimeError("sdig %s failed: %s" % (sdig_command_line, except_inst.output.decode('ascii', errors='replace')))