]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_helper.py
auth 4.1 build: Switch to devtoolset 7 for el6
[thirdparty/pdns.git] / regression-tests.api / test_helper.py
1 from datetime import datetime
2 import os
3 import requests
4 import urlparse
5 import unittest
6 import sqlite3
7 import subprocess
8
9 DAEMON = os.environ.get('DAEMON', 'authoritative')
10 PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
11 SQLITE_DB = os.environ.get('SQLITE_DB', 'pdns.sqlite3')
12
13
14 class ApiTestCase(unittest.TestCase):
15
16 def setUp(self):
17 # TODO: config
18 self.server_address = '127.0.0.1'
19 self.server_port = int(os.environ.get('WEBPORT', '5580'))
20 self.server_url = 'http://%s:%s/' % (self.server_address, self.server_port)
21 self.session = requests.Session()
22 self.session.headers = {'X-API-Key': os.environ.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self.server_address, self.server_port)}
23
24 def url(self, relative_url):
25 return urlparse.urljoin(self.server_url, relative_url)
26
27 def assert_success_json(self, result):
28 try:
29 result.raise_for_status()
30 except:
31 print result.content
32 raise
33 self.assertEquals(result.headers['Content-Type'], 'application/json')
34
35 def assert_error_json(self, result):
36 self.assertTrue(400 <= result.status_code < 600, "Response has not an error code "+str(result.status_code))
37 self.assertEquals(result.headers['Content-Type'], 'application/json', "Response status code "+str(result.status_code))
38
39 def assert_success(self, result):
40 try:
41 result.raise_for_status()
42 except:
43 print result.content
44 raise
45
46
47 def unique_zone_name():
48 return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '.org.'
49
50
51 def is_auth():
52 return DAEMON == 'authoritative'
53
54
55 def is_recursor():
56 return DAEMON == 'recursor'
57
58
59 def get_auth_db():
60 """Return Connection to Authoritative backend DB."""
61 return sqlite3.Connection(SQLITE_DB)
62
63
64 def get_db_records(zonename, qtype):
65 with get_auth_db() as db:
66 rows = db.execute("""
67 SELECT name, type, content, ttl
68 FROM records
69 WHERE type = ? AND domain_id = (
70 SELECT id FROM domains WHERE name = ?
71 )""", (qtype, zonename.rstrip('.'))).fetchall()
72 recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3]} for row in rows]
73 print "DB Records:", recs
74 return recs
75
76
77 def pdnsutil_rectify(zonename):
78 """Run pdnsutil rectify-zone on the given zone."""
79 subprocess.check_call(PDNSUTIL_CMD + ['rectify-zone', zonename])