]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_helper.py
Merge 'master', but take geoipbackend from master as
[thirdparty/pdns.git] / regression-tests.api / test_helper.py
1 from datetime import datetime
2 import os
3 import requests
4 import unittest
5 import sqlite3
6 import subprocess
7 import sys
8
9 if sys.version_info[0] == 2:
10 from urlparse import urljoin
11 else:
12 from urllib.parse import urljoin
13
14
15 DAEMON = os.environ.get('DAEMON', 'authoritative')
16 PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
17 SQLITE_DB = os.environ.get('SQLITE_DB', 'pdns.sqlite3')
18
19
20 class ApiTestCase(unittest.TestCase):
21
22 def setUp(self):
23 # TODO: config
24 self.server_address = '127.0.0.1'
25 self.server_port = int(os.environ.get('WEBPORT', '5580'))
26 self.server_url = 'http://%s:%s/' % (self.server_address, self.server_port)
27 self.session = requests.Session()
28 self.session.headers = {'X-API-Key': os.environ.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self.server_address, self.server_port)}
29
30 def url(self, relative_url):
31 return urljoin(self.server_url, relative_url)
32
33 def assert_success_json(self, result):
34 try:
35 result.raise_for_status()
36 except:
37 print(result.content)
38 raise
39 self.assertEquals(result.headers['Content-Type'], 'application/json')
40
41 def assert_error_json(self, result):
42 self.assertTrue(400 <= result.status_code < 600, "Response has not an error code "+str(result.status_code))
43 self.assertEquals(result.headers['Content-Type'], 'application/json', "Response status code "+str(result.status_code))
44
45 def assert_success(self, result):
46 try:
47 result.raise_for_status()
48 except:
49 print(result.content)
50 raise
51
52
53 def unique_zone_name():
54 return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '.org.'
55
56
57 def is_auth():
58 return DAEMON == 'authoritative'
59
60
61 def is_recursor():
62 return DAEMON == 'recursor'
63
64
65 def get_auth_db():
66 """Return Connection to Authoritative backend DB."""
67 return sqlite3.Connection(SQLITE_DB)
68
69
70 def get_db_records(zonename, qtype):
71 with get_auth_db() as db:
72 rows = db.execute("""
73 SELECT name, type, content, ttl
74 FROM records
75 WHERE type = ? AND domain_id = (
76 SELECT id FROM domains WHERE name = ?
77 )""", (qtype, zonename.rstrip('.'))).fetchall()
78 recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3]} for row in rows]
79 print("DB Records:", recs)
80 return recs
81
82
83 def pdnsutil(subcommand, *args):
84 try:
85 return subprocess.check_output(PDNSUTIL_CMD + [subcommand] + list(args), close_fds=True).decode('ascii')
86 except subprocess.CalledProcessError as except_inst:
87 raise RuntimeError("pdnsutil %s %s failed: %s" % (command, args, except_inst.output.decode('ascii', errors='replace')))
88
89
90 def pdnsutil_rectify(zonename):
91 """Run pdnsutil rectify-zone on the given zone."""
92 pdnsutil('rectify-zone', zonename)