]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_helper.py
1 from __future__
import print_function
2 from datetime
import datetime
10 if sys
.version_info
[0] == 2:
11 from urlparse
import urljoin
13 from urllib
.parse
import urljoin
15 DAEMON
= os
.environ
.get('DAEMON', 'authoritative')
16 PDNSUTIL_CMD
= os
.environ
.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
17 SQLITE_DB
= os
.environ
.get('SQLITE_DB', 'pdns.sqlite3')
18 SDIG
= os
.environ
.get('SDIG', 'sdig')
19 DNSPORT
= os
.environ
.get('DNSPORT', '53')
21 class ApiTestCase(unittest
.TestCase
):
25 self
.server_address
= '127.0.0.1'
26 self
.server_port
= int(os
.environ
.get('WEBPORT', '5580'))
27 self
.server_url
= 'http://%s:%s/' % (self
.server_address
, self
.server_port
)
28 self
.session
= requests
.Session()
29 self
.session
.headers
= {'X-API-Key': os
.environ
.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self
.server_address
, self
.server_port
)}
31 def url(self
, relative_url
):
32 return urljoin(self
.server_url
, relative_url
)
34 def assert_success_json(self
, result
):
36 result
.raise_for_status()
40 self
.assertEquals(result
.headers
['Content-Type'], 'application/json')
42 def assert_error_json(self
, result
):
43 self
.assertTrue(400 <= result
.status_code
< 600, "Response has not an error code "+str(result
.status_code
))
44 self
.assertEquals(result
.headers
['Content-Type'], 'application/json', "Response status code "+str(result
.status_code
))
46 def assert_success(self
, result
):
48 result
.raise_for_status()
54 def unique_zone_name():
55 return 'test-' + datetime
.now().strftime('%d%H%S%M%f') + '.org.'
57 def unique_tsigkey_name():
58 return 'test-' + datetime
.now().strftime('%d%H%S%M%f') + '-key'
61 return DAEMON
== 'authoritative'
65 return DAEMON
== 'recursor'
69 """Return Connection to Authoritative backend DB."""
70 return sqlite3
.Connection(SQLITE_DB
)
73 def get_db_records(zonename
, qtype
):
74 with
get_auth_db() as db
:
76 SELECT name, type, content, ttl, ordername
78 WHERE type = ? AND domain_id = (
79 SELECT id FROM domains WHERE name = ?
80 )""", (qtype
, zonename
.rstrip('.'))).fetchall()
81 recs
= [{'name': row
[0], 'type': row
[1], 'content': row
[2], 'ttl': row
[3], 'ordername': row
[4]} for row
in rows
]
82 print("DB Records:", recs
)
86 def pdnsutil(subcommand
, *args
):
88 return subprocess
.check_output(PDNSUTIL_CMD
+ [subcommand
] + list(args
), close_fds
=True).decode('ascii')
89 except subprocess
.CalledProcessError
as except_inst
:
90 raise RuntimeError("pdnsutil %s %s failed: %s" % (subcommand
, args
, except_inst
.output
.decode('ascii', errors
='replace')))
92 def pdnsutil_rectify(zonename
):
93 """Run pdnsutil rectify-zone on the given zone."""
94 pdnsutil('rectify-zone', zonename
)
98 return subprocess
.check_call([SDIG
, '127.0.0.1', str(DNSPORT
)] + list(args
))
99 except subprocess
.CalledProcessError
as except_inst
:
100 raise RuntimeError("sdig %s %s failed: %s" % (command
, args
, except_inst
.output
.decode('ascii', errors
='replace')))
102 def get_db_tsigkeys(keyname
):
103 with
get_auth_db() as db
:
104 rows
= db
.execute("""
105 SELECT name, algorithm, secret
107 WHERE name = ?""", (keyname
, )).fetchall()
108 keys
= [{'name': row
[0], 'algorithm': row
[1], 'secret': row
[2]} for row
in rows
]
109 print("DB TSIG keys:", keys
)