]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
api tests: fix up for py3k and other cleanup
authorChris Hofstaedtler <chris.hofstaedtler@deduktiva.com>
Tue, 6 Mar 2018 07:40:53 +0000 (08:40 +0100)
committerChris Hofstaedtler <chris.hofstaedtler@deduktiva.com>
Tue, 6 Mar 2018 07:40:53 +0000 (08:40 +0100)
regression-tests.api/runtests.py
regression-tests.api/test_Basics.py
regression-tests.api/test_Servers.py
regression-tests.api/test_Zones.py
regression-tests.api/test_cryptokeys.py
regression-tests.api/test_helper.py

index 8332fa465639a5cd18fbc6e26b7a5457486ed09c..9d2fb6c602992f5f3d66f0e769808f64ba76adc9 100755 (executable)
@@ -2,6 +2,7 @@
 #
 # Shell-script style.
 
+from __future__ import print_function
 import os
 import requests
 import shutil
@@ -70,7 +71,7 @@ def format_call_args(cmd):
 
 
 def run_check_call(cmd, *args, **kwargs):
-    print format_call_args(cmd)
+    print(format_call_args(cmd))
     subprocess.check_call(cmd, *args, **kwargs)
 
 
@@ -86,7 +87,7 @@ tests = [opt.split('=', 1)[1] for opt in tests]
 
 daemon = (len(sys.argv) == 2) and sys.argv[1] or None
 if daemon not in ('authoritative', 'recursor'):
-    print "Usage: ./runtests (authoritative|recursor)"
+    print("Usage: ./runtests (authoritative|recursor)")
     sys.exit(2)
 
 daemon = sys.argv[1]
@@ -151,11 +152,11 @@ else:
 
 
 # Now run pdns and the tests.
-print "Launching server..."
-print format_call_args(servercmd)
+print("Launching server...")
+print(format_call_args(servercmd))
 serverproc = subprocess.Popen(servercmd, close_fds=True)
 
-print "Waiting for webserver port to become available..."
+print("Waiting for webserver port to become available...")
 available = False
 for try_number in range(0, 10):
     try:
@@ -166,15 +167,15 @@ for try_number in range(0, 10):
         time.sleep(0.5)
 
 if not available:
-    print "Webserver port not reachable after 10 tries, giving up."
+    print("Webserver port not reachable after 10 tries, giving up.")
     serverproc.terminate()
     serverproc.wait()
     sys.exit(2)
 
-print "Query for example.com/A to create statistic data..."
+print("Query for example.com/A to create statistic data...")
 run_check_call([sdig, "127.0.0.1", str(DNSPORT), "example.com", "A"])
 
-print "Running tests..."
+print("Running tests...")
 returncode = 0
 test_env = {}
 test_env.update(os.environ)
@@ -187,13 +188,13 @@ test_env.update({
 })
 
 try:
-    print ""
+    print("")
     run_check_call(["nosetests", "--with-xunit", "-v"] + tests, env=test_env)
 except subprocess.CalledProcessError as ex:
     returncode = ex.returncode
 finally:
     if wait:
-        print "Waiting as requested, press ENTER to stop."
+        print("Waiting as requested, press ENTER to stop.")
         raw_input()
     serverproc.terminate()
     serverproc.wait()
index a3d4d8136ca29af451a1bac247083ce8166f3df2..eca56505de2955124cd218f42d5a026178342022 100644 (file)
@@ -20,16 +20,16 @@ class TestBasics(ApiTestCase):
         print("Sending request")
         for part in parts:
             print("Sending %s" % part)
-            s.sendall(part)
+            s.sendall(part.encode('ascii'))
             time.sleep(0.5)
 
         resp = s.recv(4096, socket.MSG_WAITALL)
         s.close()
 
-        print "response", repr(resp)
+        print("response", repr(resp))
 
         status = resp.splitlines(0)[0]
-        if '400' in status:
+        if b'400' in status:
             raise Exception('Got unwanted response: %s' % status)
 
     def test_cors(self):
@@ -41,4 +41,4 @@ class TestBasics(ApiTestCase):
         self.assertEquals(r.headers['access-control-allow-headers'], 'Content-Type, X-API-Key')
         self.assertEquals(r.headers['access-control-allow-methods'], 'GET, POST, PUT, PATCH, DELETE, OPTIONS')
 
-        print "response", repr(r.headers)
+        print("response", repr(r.headers))
index 10bd99d8804bd437f0a94b21159ade7030ddab33..400a6e47fd49cb87ee040b7e8b416e741afb1351 100644 (file)
@@ -42,7 +42,7 @@ class Servers(ApiTestCase):
         data = r.json()
         self.assertIn('uptime', [e['name'] for e in data])
         if is_auth():
-            print data
+            print(data)
             qtype_stats, respsize_stats, queries_stats = None, None, None
             for elem in data:
                 if elem['type'] == 'MapStatisticItem' and elem['name'] == 'queries-by-qtype':
index ece4a0df23c2a36a609e2b6bc97e48ec85cab3c9..6a456d9012d87347bcc32ac7a0b79b8db2e3f65d 100644 (file)
@@ -1,3 +1,4 @@
+from __future__ import print_function
 import json
 import time
 import unittest
@@ -23,23 +24,23 @@ def get_first_rec(data, qname, qtype):
 def eq_zone_rrsets(rrsets, expected):
     data_got = {}
     data_expected = {}
-    for type_, expected_records in expected.iteritems():
+    for type_, expected_records in expected.items():
         type_ = str(type_)
         data_got[type_] = set()
         data_expected[type_] = set()
         uses_name = any(['name' in expected_record for expected_record in expected_records])
         # minify + convert received data
         for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
-            print rrset
+            print(rrset)
             for r in rrset['records']:
                 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
         # minify expected data
         for r in expected_records:
             data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
 
-    print "eq_zone_rrsets: got:"
+    print("eq_zone_rrsets: got:")
     pprint(data_got)
-    print "eq_zone_rrsets: expected:"
+    print("eq_zone_rrsets: expected:")
     pprint(data_expected)
 
     assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
@@ -79,7 +80,7 @@ class AuthZonesHelperMixin(object):
                 del payload[k]
             else:
                 payload[k] = v
-        print "sending", payload
+        print("sending", payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones"),
             data=json.dumps(payload),
@@ -87,7 +88,7 @@ class AuthZonesHelperMixin(object):
         self.assert_success_json(r)
         self.assertEquals(r.status_code, 201)
         reply = r.json()
-        print "reply", reply
+        print("reply", reply)
         return name, payload, reply
 
 
@@ -120,7 +121,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
             if k in payload:
                 self.assertEquals(data[k], payload[k])
         # generated EPOCH serial surely is > fixed serial we passed in
-        print data
+        print(data)
         self.assertGreater(data['serial'], payload['serial'])
         soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
         self.assertGreater(soa_serial, payload['serial'])
@@ -129,7 +130,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
     def test_create_zone_with_account(self):
         # soa_edit_api wins over serial
         name, payload, data = self.create_zone(account='anaccount', serial=10)
-        print data
+        print(data)
         for k in ('account', ):
             self.assertIn(k, data)
             if k in payload:
@@ -137,12 +138,12 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
 
     def test_create_zone_default_soa_edit_api(self):
         name, payload, data = self.create_zone()
-        print data
+        print(data)
         self.assertEquals(data['soa_edit_api'], 'DEFAULT')
 
     def test_create_zone_with_soa_edit(self):
         name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
-        print data
+        print(data)
         self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT')
         self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
         soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
@@ -222,7 +223,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
             'kind': 'Native',
             'nameservers': ['uncanon.example.com']
         }
-        print payload
+        print(payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones"),
             data=json.dumps(payload),
@@ -236,7 +237,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
             'name': '',
             'kind': 'Native',
         }
-        print payload
+        print(payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones"),
             data=json.dumps(payload),
@@ -268,7 +269,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
             'kind': 'Native',
             'nameservers': ['ns1.example.com.']
         }
-        print payload
+        print(payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones"),
             data=json.dumps(payload),
@@ -283,7 +284,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
             'kind': 'Native',
             'nameservers': ['ns1.example.com']
         }
-        print payload
+        print(payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones"),
             data=json.dumps(payload),
@@ -308,7 +309,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
             'nameservers': ['ns1.example.com.'],
             'rrsets': [rrset],
         }
-        print payload
+        print(payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones"),
             data=json.dumps(payload),
@@ -333,7 +334,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
             'nameservers': ['ns1.example.com.'],
             'rrsets': [rrset],
         }
-        print payload
+        print(payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones"),
             data=json.dumps(payload),
@@ -360,7 +361,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
             'kind': 'Native',
             'nameservers': [{'a': 'ns1.example.com'}]  # invalid
         }
-        print payload
+        print(payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones"),
             data=json.dumps(payload),
@@ -385,7 +386,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
 
         keys = r.json()
 
-        print keys
+        print(keys)
 
         self.assertEquals(r.status_code, 200)
         self.assertEquals(len(keys), 1)
@@ -436,7 +437,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
 
         data = r.json()
 
-        print data
+        print(data)
 
         self.assertEquals(r.status_code, 200)
         self.assertEquals(len(data['metadata']), 1)
@@ -463,7 +464,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
 
         data = r.json()
 
-        print data
+        print(data)
 
         self.assertEquals(r.status_code, 200)
         self.assertEquals(len(data['metadata']), 1)
@@ -540,17 +541,17 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         for k in ('name', 'masters', 'kind'):
             self.assertIn(k, data)
             self.assertEquals(data[k], payload[k])
-        print "payload:", payload
-        print "data:", data
+        print("payload:", payload)
+        print("data:", data)
         # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
         r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
         zonelist = r.json()
-        print "zonelist:", zonelist
+        print("zonelist:", zonelist)
         self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
         # Also test that fetching the zone works.
         r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
         data = r.json()
-        print "zone (fetched):", data
+        print("zone (fetched):", data)
         for k in ('name', 'masters', 'kind'):
             self.assertIn(k, data)
             self.assertEquals(data[k], payload[k])
@@ -564,21 +565,21 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
 
     def test_retrieve_slave_zone(self):
         name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
-        print "payload:", payload
-        print "data:", data
+        print("payload:", payload)
+        print("data:", data)
         r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
         data = r.json()
-        print "status for axfr-retrieve:", data
+        print("status for axfr-retrieve:", data)
         self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
                          '\' from master 127.0.0.2')
 
     def test_notify_master_zone(self):
         name, payload, data = self.create_zone(kind='Master')
-        print "payload:", payload
-        print "data:", data
+        print("payload:", payload)
+        print("data:", data)
         r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
         data = r.json()
-        print "status for notify:", data
+        print("status for notify:", data)
         self.assertEqual(data['result'], 'Notification queued')
 
     def test_get_zone_with_symbols(self):
@@ -1172,7 +1173,7 @@ fred   IN  A      192.168.0.4
             self.url("/api/v1/servers/localhost/zones/" + name),
             data=json.dumps(payload),
             headers={'content-type': 'application/json'})
-        print r.content
+        print(r.content)
         self.assert_success(r)  # succeed so users can fix their wrong, old data
 
     def test_zone_delete(self):
@@ -1209,7 +1210,7 @@ fred   IN  A      192.168.0.4
         # records are still present
         data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
         serverset = get_rrset(data, name, 'NS')
-        print serverset
+        print(serverset)
         self.assertNotEquals(serverset['records'], [])
         self.assertNotEquals(serverset['comments'], [])
         # verify that modified_at has been set by pdns
@@ -1235,7 +1236,7 @@ fred   IN  A      192.168.0.4
         # make sure the NS records are still present
         data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
         serverset = get_rrset(data, name, 'NS')
-        print serverset
+        print(serverset)
         self.assertNotEquals(serverset['records'], [])
         self.assertEquals(serverset['comments'], [])
 
@@ -1283,7 +1284,7 @@ fred   IN  A      192.168.0.4
         # make sure the comments still exist
         data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
         serverset = get_rrset(data, name, 'NS')
-        print serverset
+        print(serverset)
         self.assertEquals(serverset['records'], rrset2['records'])
         self.assertEquals(serverset['comments'], rrset['comments'])
 
@@ -1306,7 +1307,7 @@ fred   IN  A      192.168.0.4
         self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
         r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
         revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
-        print revsets
+        print(revsets)
         self.assertEquals(revsets, [{
             u'name': u'44.4.2.192.in-addr.arpa.',
             u'ttl': 3600,
@@ -1345,7 +1346,7 @@ fred   IN  A      192.168.0.4
         self.assert_success(r)
         r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
         revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
-        print revsets
+        print(revsets)
         self.assertEquals(revsets, [{
             u'name': u'2.0.2.192.in-addr.arpa.',
             u'ttl': 3600,
@@ -1385,7 +1386,7 @@ fred   IN  A      192.168.0.4
         self.assert_success(r)
         r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
         revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
-        print revsets
+        print(revsets)
         self.assertEquals(revsets, [{
             u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
             u'ttl': 3600,
@@ -1404,7 +1405,7 @@ fred   IN  A      192.168.0.4
         self.create_zone(name=name, serial=22, soa_edit_api='')
         r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
         self.assert_success_json(r)
-        print r.json()
+        print(r.json())
         self.assertEquals(r.json(), [
             {u'object_type': u'zone', u'name': name, u'zone_id': name},
             {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
@@ -1419,25 +1420,27 @@ fred   IN  A      192.168.0.4
         ])
 
     def test_search_rr_substring(self):
-        name = 'search-rr-zone.name.'
+        name = unique_zone_name()
+        search = name[5:-5]
         self.create_zone(name=name)
-        r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*rr-zone*"))
+        r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
         self.assert_success_json(r)
-        print r.json()
+        print(r.json())
         # should return zone, SOA, ns1, ns2
         self.assertEquals(len(r.json()), 4)
 
     def test_search_rr_case_insensitive(self):
-        name = 'search-rr-insenszone.name.'
+        name = unique_zone_name()+'testsuffix.'
         self.create_zone(name=name)
-        r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*rr-insensZONE*"))
+        r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
         self.assert_success_json(r)
-        print r.json()
+        print(r.json())
         # should return zone, SOA, ns1, ns2
         self.assertEquals(len(r.json()), 4)
 
     def test_search_after_rectify_with_ent(self):
-        name = 'search-rectified.name.'
+        name = unique_zone_name()
+        search = name.split('.')[0]
         rrset = {
             "name": 'sub.sub.' + name,
             "type": "A",
@@ -1449,9 +1452,9 @@ fred   IN  A      192.168.0.4
         }
         self.create_zone(name=name, rrsets=[rrset])
         pdnsutil_rectify(name)
-        r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*search-rectified*"))
+        r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
         self.assert_success_json(r)
-        print r.json()
+        print(r.json())
         # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
         self.assertEquals(len(r.json()), 5)
 
@@ -1466,7 +1469,7 @@ fred   IN  A      192.168.0.4
             self.url("/api/v1/servers/localhost/zones?rrsets=false"),
             data=json.dumps(payload),
             headers={'content-type': 'application/json'})
-        print r.json()
+        print(r.json())
         self.assert_success_json(r)
         self.assertEquals(r.status_code, 201)
         self.assertEquals(r.json().get('rrsets'), None)
@@ -1476,7 +1479,7 @@ fred   IN  A      192.168.0.4
         self.create_zone(name=name, kind='Native')
         r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
         self.assert_success_json(r)
-        print r.json()
+        print(r.json())
         self.assertEquals(r.json().get('rrsets'), None)
 
     def test_rrset_true_parameter(self):
@@ -1484,7 +1487,7 @@ fred   IN  A      192.168.0.4
         self.create_zone(name=name, kind='Native')
         r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
         self.assert_success_json(r)
-        print r.json()
+        print(r.json())
         self.assertEquals(len(r.json().get('rrsets')), 2)
 
     def test_wrong_rrset_parameter(self):
@@ -1518,13 +1521,13 @@ class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
         )
         # Regression test: verify zone list works
         zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
-        print "zonelist:", zonelist
+        print("zonelist:", zonelist)
         self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
         # Also test that fetching the zone works.
-        print "id:", data['id']
+        print("id:", data['id'])
         self.assertEquals(data['id'], '=2E')
         data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
-        print "zone (fetched):", data
+        print("zone (fetched):", data)
         for k in ('name', 'kind'):
             self.assertIn(k, data)
             self.assertEquals(data[k], payload[k])
@@ -1599,7 +1602,7 @@ class RecursorZones(ApiTestCase):
             'servers': ['8.8.8.8'],
             'recursion_desired': False,
         }
-        print payload
+        print(payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones"),
             data=json.dumps(payload),
@@ -1658,7 +1661,7 @@ class RecursorZones(ApiTestCase):
         self.create_zone(name=name, kind='Native')
         r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
         self.assert_success_json(r)
-        print r.json()
+        print(r.json())
         self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
 
     def test_search_rr_substring(self):
@@ -1666,7 +1669,7 @@ class RecursorZones(ApiTestCase):
         self.create_zone(name=name, kind='Native')
         r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
         self.assert_success_json(r)
-        print r.json()
+        print(r.json())
         # should return zone, SOA
         self.assertEquals(len(r.json()), 2)
 
index 49c4b400d5ab1b5bb54cb88e5c4a30270b10efa7..e2ce4de8bdccd3dc35d3be4abd5a0142faa21ed3 100644 (file)
@@ -3,33 +3,39 @@ import json
 import unittest
 import os
 
-from test_helper import ApiTestCase, is_auth
+from test_helper import ApiTestCase, is_auth, pdnsutil, unique_zone_name
 
 @unittest.skipIf(not is_auth(), "Not applicable")
 class Cryptokeys(ApiTestCase):
 
-    def __init__(self, *args, **kwds):
-        super(Cryptokeys, self).__init__(*args, **kwds)
+    def setUp(self):
+        super(Cryptokeys, self).setUp()
         self.keyid = 0
-        self.zone = "cryptokeys.org"
+        self.zone = unique_zone_name()
+        self.zone_nodot = self.zone[:-1]
+        payload = {
+            'name': self.zone,
+            'kind': 'Native',
+            'nameservers': ['ns1.example.com.', 'ns2.example.com.']
+        }
+        r = self.session.post(
+            self.url("/api/v1/servers/localhost/zones"),
+            data=json.dumps(payload),
+            headers={'content-type': 'application/json'})
+        self.assert_success_json(r)
+        self.assertEquals(r.status_code, 201)
 
     def tearDown(self):
-        super(Cryptokeys,self).tearDown()
+        super(Cryptokeys, self).tearDown()
         self.remove_zone_key(self.keyid)
 
     # Adding a key to self.zone using the pdnsutil command
     def add_zone_key(self, status='inactive'):
-        try:
-            return subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "add-zone-key", self.zone, "ksk", status], stderr=open(os.devnull, 'wb'))
-        except subprocess.CalledProcessError as e:
-            self.fail("pdnsutil add-zone-key failed: "+e.output)
+        return pdnsutil("add-zone-key", self.zone_nodot, "ksk", status)
 
     # Removes a key from self.zone by id using the pdnsutil command
     def remove_zone_key(self, key_id):
-        try:
-            subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "remove-zone-key", self.zone, str(key_id)])
-        except subprocess.CalledProcessError as e:
-            self.fail("pdnsutil remove-zone-key failed: "+e.output)
+        return pdnsutil("remove-zone-key", self.zone_nodot, str(key_id))
 
     # This method tests the DELETE api call.
     def test_delete(self):
@@ -38,14 +44,11 @@ class Cryptokeys(ApiTestCase):
         #checks the status code. I don't know how to test explicit that the backend fail removing a key.
         r = self.session.delete(self.url("/api/v1/servers/localhost/zones/"+self.zone+"/cryptokeys/"+self.keyid))
         self.assertEquals(r.status_code, 200)
-        self.assertEquals(r.content, "")
+        self.assertEquals(r.content, b"")
 
         # Check that the key is actually deleted
-        try:
-            out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "list-keys", self.zone])
-            self.assertNotIn(self.zone, out)
-        except subprocess.CalledProcessError as e:
-            self.fail("pdnsutil list-keys failed: " + e.output)
+        out = pdnsutil("list-keys", self.zone)
+        self.assertNotIn(self.zone, out)
 
     def test_get_wrong_zone(self):
         self.keyid = self.add_zone_key()
@@ -64,25 +67,21 @@ class Cryptokeys(ApiTestCase):
         #checks for key is gone. Its ok even if no key had to be deleted. Or something went wrong with the backend.
         r = self.session.delete(self.url("/api/v1/servers/localhost/zones/"+self.zone+"/cryptokeys/"+self.keyid))
         self.assertEquals(r.status_code, 200)
-        self.assertEquals(r.content, "")
+        self.assertEquals(r.content, b"")
 
     # Prepares the json object for Post and sends it to the server
-    def add_key(self, content='', type='ksk', active='true' , algo='', bits=0):
-        if algo == '':
-            payload = {
-                'keytype': type,
-                'active' : active
-            }
-        else:
-            payload = {
-                'keytype': type,
-                'active' : active,
-                'algorithm' : algo
-            }
-        if bits > 0:
+    def add_key(self, content='', type='ksk', active='true', algo='', bits=None):
+        payload = {
+            'keytype': type,
+            'active': active,
+        }
+        if algo:
+            payload['algorithm'] = algo
+        if bits is not None:
             payload['bits'] = bits
         if content != '':
             payload['content'] = content
+        print("create key with payload:", payload)
         r = self.session.post(
             self.url("/api/v1/servers/localhost/zones/"+self.zone+"/cryptokeys"),
             data=json.dumps(payload),
@@ -91,7 +90,7 @@ class Cryptokeys(ApiTestCase):
         return r
 
     # Test POST for a positive result and delete the added key
-    def post_helper(self,content='', algo='', bits=0):
+    def post_helper(self, content='', algo='', bits=None):
         r = self.add_key(content=content, algo=algo, bits=bits)
         self.assert_success_json(r)
         self.assertEquals(r.status_code, 201)
@@ -100,11 +99,8 @@ class Cryptokeys(ApiTestCase):
         self.assertEquals(response['keytype'], 'csk')
         self.keyid = response['id']
         # Check if the key is actually added
-        try:
-            out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "list-keys", self.zone])
-            self.assertIn(self.zone, out)
-        except subprocess.CalledProcessError as e:
-            self.fail("pdnsutil list-keys failed: " + e.output)
+        out = pdnsutil("list-keys", self.zone_nodot)
+        self.assertIn(self.zone_nodot, out)
 
     # Test POST to add a key with default algorithm
     def test_post(self):
@@ -198,17 +194,14 @@ class Cryptokeys(ApiTestCase):
             data=json.dumps(payload),
             headers={'content-type': 'application/json'})
         self.assertEquals(r.status_code, 204)
-        self.assertEquals(r.content, "")
+        self.assertEquals(r.content, b"")
 
         # check if key is activated
-        try:
-            out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone])
-            self.assertIn("Active", out)
-        except subprocess.CalledProcessError as e:
-            self.fail("pdnsutil show-zone failed: " + e.output)
+        out = pdnsutil("show-zone", self.zone_nodot)
+        self.assertIn("Active", out)
 
     def test_put_deactivate_key(self):
-        self.keyid= self.add_zone_key(status='active')
+        self.keyid = self.add_zone_key(status='active')
         # deactivate key
         payload2 = {
             'active': False
@@ -219,14 +212,11 @@ class Cryptokeys(ApiTestCase):
             data=json.dumps(payload2),
             headers={'content-type': 'application/json'})
         self.assertEquals(r.status_code, 204)
-        self.assertEquals(r.content, "")
+        self.assertEquals(r.content, b"")
 
         # check if key is deactivated
-        try:
-            out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone])
-            self.assertIn("Inactive", out)
-        except subprocess.CalledProcessError as e:
-            self.fail("pdnsutil show-zone failed: " + e.output)
+        out = pdnsutil("show-zone", self.zone_nodot)
+        self.assertIn("Inactive", out)
 
     def test_put_deactivate_inactive_key(self):
         self.keyid = self.add_zone_key()
@@ -241,14 +231,11 @@ class Cryptokeys(ApiTestCase):
             data=json.dumps(payload),
             headers={'content-type': 'application/json'})
         self.assertEquals(r.status_code, 204)
-        self.assertEquals(r.content, "")
+        self.assertEquals(r.content, b"")
 
         # check if key is still deactivated
-        try:
-            out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone])
-            self.assertIn("Inactive", out)
-        except subprocess.CalledProcessError as e:
-            self.fail("pdnsutil show-zone failed: " + e.output)
+        out = pdnsutil("show-zone", self.zone_nodot)
+        self.assertIn("Inactive", out)
 
     def test_put_activate_active_key(self):
         self.keyid =self.add_zone_key(status='active')
@@ -262,11 +249,8 @@ class Cryptokeys(ApiTestCase):
             data=json.dumps(payload2),
             headers={'content-type': 'application/json'})
         self.assertEquals(r.status_code, 204)
-        self.assertEquals(r.content, "")
+        self.assertEquals(r.content, b"")
 
         # check if key is activated
-        try:
-            out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone])
-            self.assertIn("Active", out)
-        except subprocess.CalledProcessError as e:
-            self.fail("pdnsutil show-zone failed: " + e.output)
+        out = pdnsutil("show-zone", self.zone_nodot)
+        self.assertIn("Active", out)
index 14c2ea4c21bc289d1bca9cc21ac8a8c8feaddf79..8c1dfd3759a30202c98f6f38aea71c49fbd223ee 100644 (file)
@@ -1,10 +1,16 @@
 from datetime import datetime
 import os
 import requests
-import urlparse
 import unittest
 import sqlite3
 import subprocess
+import sys
+
+if sys.version_info[0] == 2:
+    from urlparse import urljoin
+else:
+    from urllib.parse import urljoin
+
 
 DAEMON = os.environ.get('DAEMON', 'authoritative')
 PDNSUTIL_CMD = os.environ.get('PDNSUTIL_CMD', 'NOT_SET BUT_THIS MIGHT_BE_A_LIST').split(' ')
@@ -22,13 +28,13 @@ class ApiTestCase(unittest.TestCase):
         self.session.headers = {'X-API-Key': os.environ.get('APIKEY', 'changeme-key'), 'Origin': 'http://%s:%s' % (self.server_address, self.server_port)}
 
     def url(self, relative_url):
-        return urlparse.urljoin(self.server_url, relative_url)
+        return urljoin(self.server_url, relative_url)
 
     def assert_success_json(self, result):
         try:
             result.raise_for_status()
         except:
-            print result.content
+            print(result.content)
             raise
         self.assertEquals(result.headers['Content-Type'], 'application/json')
 
@@ -40,7 +46,7 @@ class ApiTestCase(unittest.TestCase):
         try:
             result.raise_for_status()
         except:
-            print result.content
+            print(result.content)
             raise
 
 
@@ -70,10 +76,17 @@ def get_db_records(zonename, qtype):
                 SELECT id FROM domains WHERE name = ?
             )""", (qtype, zonename.rstrip('.'))).fetchall()
         recs = [{'name': row[0], 'type': row[1], 'content': row[2], 'ttl': row[3]} for row in rows]
-        print "DB Records:", recs
+        print("DB Records:", recs)
         return recs
 
 
+def pdnsutil(subcommand, *args):
+    try:
+        return subprocess.check_output(PDNSUTIL_CMD + [subcommand] + list(args), close_fds=True).decode('ascii')
+    except subprocess.CalledProcessError as except_inst:
+        raise RuntimeError("pdnsutil %s %s failed: %s" % (command, args, except_inst.output.decode('ascii', errors='replace')))
+
+
 def pdnsutil_rectify(zonename):
     """Run pdnsutil rectify-zone on the given zone."""
-    subprocess.check_call(PDNSUTIL_CMD + ['rectify-zone', zonename])
+    pdnsutil('rectify-zone', zonename)