1 from __future__
import print_function
2 from datetime
import datetime
12 if sys
.version_info
[0] == 2:
13 from urlparse
import urljoin
15 from urllib
.parse
import urljoin
17 DAEMON
= os
.environ
.get('DAEMON', 'authoritative')
18 PDNSUTIL_CMD
= os
.environ
.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
19 BACKEND
= os
.environ
.get('BACKEND', 'gsqlite3')
20 MYSQL_DB
= os
.environ
.get('MYSQL_DB', 'pdnsapi')
21 MYSQL_USER
= os
.environ
.get('MYSQL_USER', 'root')
22 MYSQL_HOST
= os
.environ
.get('MYSQL_HOST', 'localhost')
23 MYSQL_PASSWD
= os
.environ
.get('MYSQL_PASWORD', '')
24 PGSQL_DB
= os
.environ
.get('PGSQL_DB', 'pdnsapi')
25 SQLITE_DB
= os
.environ
.get('SQLITE_DB', 'pdns.sqlite3')
26 LMDB_DB
= os
.environ
.get('SQLITE_DB', 'pdns.lmdb')
27 SDIG
= os
.environ
.get('SDIG', 'sdig')
28 DNSPORT
= os
.environ
.get('DNSPORT', '53')
30 class ApiTestCase(unittest
.TestCase
):
34 self
.server_address
= '127.0.0.1'
35 self
.webServerBasicAuthPassword
= 'something'
36 self
.server_port
= int(os
.environ
.get('WEBPORT', '5580'))
37 self
.server_url
= 'http://%s:%s/' % (self
.server_address
, self
.server_port
)
38 self
.server_web_password
= os
.environ
.get('WEBPASSWORD', 'MISSING')
39 self
.session
= requests
.Session()
40 self
.session
.headers
= {'X-API-Key': os
.environ
.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self
.server_address
, self
.server_port
)}
42 def url(self
, relative_url
):
43 return urljoin(self
.server_url
, relative_url
)
45 def assert_success_json(self
, result
):
47 result
.raise_for_status()
51 self
.assertEqual(result
.headers
['Content-Type'], 'application/json')
53 def assert_error_json(self
, result
):
54 self
.assertTrue(400 <= result
.status_code
< 600, "Response has not an error code "+str(result
.status_code
))
55 self
.assertEqual(result
.headers
['Content-Type'], 'application/json', "Response status code "+str(result
.status_code
))
57 def assert_success(self
, result
):
59 result
.raise_for_status()
65 def unique_zone_name():
66 return 'test-' + datetime
.now().strftime('%d%H%S%M%f') + '.org.'
68 def unique_tsigkey_name():
69 return 'test-' + datetime
.now().strftime('%d%H%S%M%f') + '-key'
72 return DAEMON
== 'authoritative'
75 return DAEMON
== 'authoritative' and BACKEND
== 'lmdb'
78 return DAEMON
== 'recursor'
82 """Return Connection to Authoritative backend DB."""
83 if BACKEND
== 'gmysql':
84 return mysql
.connector
.connect(database
=MYSQL_DB
, user
=MYSQL_USER
, host
=MYSQL_HOST
, password
=MYSQL_PASSWD
), "%s"
85 elif BACKEND
== 'gpgsql':
86 return psycopg2
.connect(database
=PGSQL_DB
), "%s"
88 return sqlite3
.Connection(SQLITE_DB
), "?"
91 def get_db_records(zonename
, qtype
):
92 db
, placeholder
= get_auth_db()
95 SELECT name, type, content, ttl, ordername
97 WHERE type = """+placeholder
+""" AND domain_id = (
98 SELECT id FROM domains WHERE name = """+placeholder
+"""
99 )""", (qtype
, zonename
.rstrip('.')))
100 rows
= cur
.fetchall()
103 recs
= [{'name': row
[0], 'type': row
[1], 'content': row
[2], 'ttl': row
[3], 'ordername': row
[4]} for row
in rows
]
104 print("DB Records:", recs
)
108 def pdnsutil(subcommand
, *args
):
110 return subprocess
.check_output(PDNSUTIL_CMD
+ [subcommand
] + list(args
), close_fds
=True).decode('ascii')
111 except subprocess
.CalledProcessError
as except_inst
:
112 raise RuntimeError("pdnsutil %s %s failed: %s" % (subcommand
, args
, except_inst
.output
.decode('ascii', errors
='replace')))
114 def pdnsutil_rectify(zonename
):
115 """Run pdnsutil rectify-zone on the given zone."""
116 pdnsutil('rectify-zone', zonename
)
120 sdig_command_line
= [SDIG
, '127.0.0.1', str(DNSPORT
)] + list(args
)
122 sdig_command_line
= [SDIG
, '127.0.0.1', str(DNSPORT
)] + list(args
) + ["recurse"]
124 return subprocess
.check_output(sdig_command_line
).decode('utf-8')
125 except subprocess
.CalledProcessError
as except_inst
:
126 raise RuntimeError("sdig %s failed: %s" % (sdig_command_line
, except_inst
.output
.decode('ascii', errors
='replace')))