]>
Commit | Line | Data |
---|---|---|
fa39af54 | 1 | from __future__ import print_function |
e2dba705 CH |
2 | from datetime import datetime |
3 | import os | |
1a152698 | 4 | import requests |
1a152698 | 5 | import unittest |
0bbd0a64 KM |
6 | import mysql.connector |
7 | import psycopg2 | |
1d6b70f9 | 8 | import sqlite3 |
7cbc5255 | 9 | import subprocess |
541bb91b CH |
10 | import sys |
11 | ||
12 | if sys.version_info[0] == 2: | |
13 | from urlparse import urljoin | |
14 | else: | |
15 | from urllib.parse import urljoin | |
16 | ||
7c876c30 | 17 | DAEMON = os.environ.get('DAEMON', 'authoritative') |
7cbc5255 | 18 | PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ') |
0bbd0a64 KM |
19 | BACKEND = os.environ.get('BACKEND', 'gsqlite3') |
20 | MYSQL_DB = os.environ.get('MYSQL_DB', 'pdnsapi') | |
21 | MYSQL_USER = os.environ.get('MYSQL_USER', 'root') | |
22 | MYSQL_HOST = os.environ.get('MYSQL_HOST', 'localhost') | |
23 | MYSQL_PASSWD = os.environ.get('MYSQL_PASWORD', '') | |
24 | PGSQL_DB = os.environ.get('PGSQL_DB', 'pdnsapi') | |
1d6b70f9 | 25 | SQLITE_DB = os.environ.get('SQLITE_DB', 'pdns.sqlite3') |
0bbd0a64 | 26 | LMDB_DB = os.environ.get('SQLITE_DB', 'pdns.lmdb') |
d19c22a1 CHB |
27 | SDIG = os.environ.get('SDIG', 'sdig') |
28 | DNSPORT = os.environ.get('DNSPORT', '53') | |
1a152698 CH |
29 | |
30 | class ApiTestCase(unittest.TestCase): | |
31 | ||
32 | def setUp(self): | |
33 | # TODO: config | |
825fa717 | 34 | self.server_address = '127.0.0.1' |
8b4030fb | 35 | self.webServerBasicAuthPassword = 'something' |
825fa717 CH |
36 | self.server_port = int(os.environ.get('WEBPORT', '5580')) |
37 | self.server_url = 'http://%s:%s/' % (self.server_address, self.server_port) | |
c563cbe5 | 38 | self.server_web_password = os.environ.get('WEBPASSWORD', 'MISSING') |
1a152698 | 39 | self.session = requests.Session() |
7f7481be | 40 | self.session.headers = {'X-API-Key': os.environ.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self.server_address, self.server_port)} |
1a152698 CH |
41 | |
42 | def url(self, relative_url): | |
541bb91b | 43 | return urljoin(self.server_url, relative_url) |
1a152698 | 44 | |
c1374bdb | 45 | def assert_success_json(self, result): |
fa75c441 CH |
46 | try: |
47 | result.raise_for_status() | |
48 | except: | |
541bb91b | 49 | print(result.content) |
fa75c441 | 50 | raise |
4bfebc93 | 51 | self.assertEqual(result.headers['Content-Type'], 'application/json') |
e2dba705 | 52 | |
ad918272 BZ |
53 | def assert_error_json(self, result): |
54 | self.assertTrue(400 <= result.status_code < 600, "Response has not an error code "+str(result.status_code)) | |
4bfebc93 | 55 | self.assertEqual(result.headers['Content-Type'], 'application/json', "Response status code "+str(result.status_code)) |
ad918272 | 56 | |
f0e76cee CH |
57 | def assert_success(self, result): |
58 | try: | |
59 | result.raise_for_status() | |
60 | except: | |
541bb91b | 61 | print(result.content) |
f0e76cee CH |
62 | raise |
63 | ||
e2dba705 CH |
64 | |
65 | def unique_zone_name(): | |
1d6b70f9 | 66 | return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '.org.' |
7c876c30 | 67 | |
6eca6510 PL |
68 | def unique_tsigkey_name(): |
69 | return 'test-' + datetime.now().strftime('%d%H%S%M%f') + '-key' | |
7c876c30 | 70 | |
c1374bdb CH |
71 | def is_auth(): |
72 | return DAEMON == 'authoritative' | |
7c876c30 | 73 | |
d1b98434 | 74 | def is_auth_lmdb(): |
11611fd2 | 75 | return DAEMON == 'authoritative' and BACKEND == 'lmdb' |
7c876c30 | 76 | |
c1374bdb CH |
77 | def is_recursor(): |
78 | return DAEMON == 'recursor' | |
1d6b70f9 CH |
79 | |
80 | ||
1d6b70f9 CH |
81 | def get_auth_db(): |
82 | """Return Connection to Authoritative backend DB.""" | |
0bbd0a64 KM |
83 | if BACKEND == 'gmysql': |
84 | return mysql.connector.connect(database=MYSQL_DB, user=MYSQL_USER, host=MYSQL_HOST, password=MYSQL_PASSWD), "%s" | |
85 | elif BACKEND == 'gpgsql': | |
5a8b492c | 86 | return psycopg2.connect(database=PGSQL_DB), "%s" |
0bbd0a64 KM |
87 | else: |
88 | return sqlite3.Connection(SQLITE_DB), "?" | |
1d6b70f9 CH |
89 | |
90 | ||
91 | def get_db_records(zonename, qtype): | |
0bbd0a64 KM |
92 | db, placeholder = get_auth_db() |
93 | cur = db.cursor() | |
94 | cur.execute(""" | |
95 | SELECT name, type, content, ttl, ordername | |
96 | FROM records | |
97 | WHERE type = """+placeholder+""" AND domain_id = ( | |
98 | SELECT id FROM domains WHERE name = """+placeholder+""" | |
99 | )""", (qtype, zonename.rstrip('.'))) | |
100 | rows = cur.fetchall() | |
101 | cur.close() | |
102 | db.close() | |
103 | recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3], 'ordername': row[4]} for row in rows] | |
104 | print("DB Records:", recs) | |
105 | return recs | |
7cbc5255 CH |
106 | |
107 | ||
541bb91b CH |
108 | def pdnsutil(subcommand, *args): |
109 | try: | |
110 | return subprocess.check_output(PDNSUTIL_CMD + [subcommand] + list(args), close_fds=True).decode('ascii') | |
111 | except subprocess.CalledProcessError as except_inst: | |
7a0ea291 | 112 | raise RuntimeError("pdnsutil %s %s failed: %s" % (subcommand, args, except_inst.output.decode('ascii', errors='replace'))) |
541bb91b | 113 | |
7cbc5255 CH |
114 | def pdnsutil_rectify(zonename): |
115 | """Run pdnsutil rectify-zone on the given zone.""" | |
541bb91b | 116 | pdnsutil('rectify-zone', zonename) |
d19c22a1 CHB |
117 | |
118 | def sdig(*args): | |
8d8ee515 OM |
119 | if is_auth(): |
120 | sdig_command_line = [SDIG, '127.0.0.1', str(DNSPORT)] + list(args) | |
121 | else: | |
122 | sdig_command_line = [SDIG, '127.0.0.1', str(DNSPORT)] + list(args) + ["recurse"] | |
d19c22a1 | 123 | try: |
68fe406a | 124 | return subprocess.check_output(sdig_command_line).decode('utf-8') |
d19c22a1 | 125 | except subprocess.CalledProcessError as except_inst: |
68fe406a | 126 | raise RuntimeError("sdig %s failed: %s" % (sdig_command_line, except_inst.output.decode('ascii', errors='replace'))) |