]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
API tests: add get_zone helper
authorChris Hofstaedtler <chris.hofstaedtler@deduktiva.com>
Wed, 16 Nov 2022 11:29:18 +0000 (12:29 +0100)
committerChris Hofstaedtler <chris.hofstaedtler@deduktiva.com>
Fri, 11 Aug 2023 10:46:01 +0000 (12:46 +0200)
regression-tests.api/test_Zones.py

index 7440a2db3f740b96f8e9038a6abe24e6a8002dd5..2f5aa088155e221b734dd6ccc7ace96161045d68 100644 (file)
@@ -108,6 +108,29 @@ class AuthZonesHelperMixin(object):
         print("reply", reply)
         return name, payload, reply
 
+    def get_zone(self, api_zone_id, expect_error=None, **kwargs):
+        print("GET zone", api_zone_id, "args:", kwargs)
+        r = self.session.get(
+            self.url("/api/v1/servers/localhost/zones/" + api_zone_id),
+            params=kwargs
+        )
+
+        reply = r.json()
+        print("reply", reply)
+
+        if expect_error:
+            self.assertEqual(r.status_code, 422)
+            if expect_error is True:
+                pass
+            else:
+                self.assertIn(expect_error, r.json()['error'])
+        else:
+            # expect success
+            self.assert_success_json(r)
+            self.assertEqual(r.status_code, 200)
+
+        return reply
+
     def put_zone(self, api_zone_id, payload, expect_error=None):
         print("PUT zone", api_zone_id, "with:", payload)
         r = self.session.put(
@@ -227,8 +250,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
             self.url("/api/v1/servers/localhost/zones/" + data['id']),
             data=json.dumps(payload),
             headers={'content-type': 'application/json'})
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
-        data = r.json()
+        data = self.get_zone(data['id'])
         soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
         self.assertEqual(soa_serial[-2:], '02')
 
@@ -483,7 +505,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         name = unique_zone_name()
         name, payload, data = self.create_zone(dnssec=True)
 
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
+        self.get_zone(name)
 
         for k in ('dnssec', ):
             self.assertIn(k, data)
@@ -511,11 +533,8 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         name, payload, data = self.create_zone(dnssec=True)
 
         self.put_zone(name, {'dnssec': False})
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
-
-        zoneinfo = r.json()
 
-        self.assertEqual(r.status_code, 200)
+        zoneinfo = self.get_zone(name)
         self.assertEqual(zoneinfo['dnssec'], False)
 
         r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
@@ -533,7 +552,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         nsec3param = '1 0 100 aabbccddeeff'
         name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
 
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
+        self.get_zone(name)
 
         for k in ('dnssec', 'nsec3param'):
             self.assertIn(k, data)
@@ -560,7 +579,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
                                                nsec3narrow=True)
 
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
+        self.get_zone(name)
 
         for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
             self.assertIn(k, data)
@@ -585,11 +604,8 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         name, payload, data = self.create_zone(dnssec=True,
                                                nsec3param='1 0 1 ab')
         self.put_zone(name, {'nsec3param': ''})
-        r = self.session.get(
-            self.url("/api/v1/servers/localhost/zones/" + name))
-        data = r.json()
 
-        self.assertEqual(r.status_code, 200)
+        data = self.get_zone(name)
         self.assertEqual(data['nsec3param'], '')
 
     def test_create_zone_without_dnssec_unset_nsec3parm(self):
@@ -617,25 +633,19 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         self.assertEqual(soa_serial[-2:], '01')
 
         self.put_zone(name, {'dnssec': True})
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
 
-        data = r.json()
+        data = self.get_zone(name)
         soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
-
-        self.assertEqual(r.status_code, 200)
         self.assertEqual(soa_serial[-2:], '02')
 
         self.put_zone(name, {'dnssec': False})
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
 
-        data = r.json()
+        data = self.get_zone(name)
         soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
-
-        self.assertEqual(r.status_code, 200)
         self.assertEqual(soa_serial[-2:], '03')
 
     def test_zone_absolute_url(self):
-        name, payload, data = self.create_zone()
+        self.create_zone()
         r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
         rdata = r.json()
         print(rdata[0])
@@ -712,8 +722,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         print("zonelist:", zonelist)
         self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
         # Also test that fetching the zone works.
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
-        data = r.json()
+        data = self.get_zone(data['id'])
         print("zone (fetched):", data)
         for k in ('name', 'masters', 'kind'):
             self.assertIn(k, data)
@@ -785,8 +794,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
         name = payload['name']
         zone_id = (name.replace('/', '=2F'))
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
-        data = r.json()
+        data = self.get_zone(zone_id)
         for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
             self.assertIn(k, data)
             if k in payload:
@@ -796,9 +804,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
         domains = r.json()
         example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id']))
-        self.assert_success_json(r)
-        data = r.json()
+        data = self.get_zone(example_com['id'])
         for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
             self.assertIn(k, data)
         self.assertEqual(data['name'], 'example.com.')
@@ -809,9 +815,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
 
         # verify single record from name that has a single record
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id'] + "?rrset_name=host-18000.example.com."))
-        self.assert_success_json(r)
-        data = r.json()
+        data = self.get_zone(example_com['id'], rrset_name="host-18000.example.com.")
         for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
             self.assertIn(k, data)
         self.assertEqual(data['rrsets'],
@@ -834,9 +838,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
 
         # verify two RRsets from a name that has two types with one record each
         powerdnssec_org = [domain for domain in domains if domain['name'] == u'powerdnssec.org.'][0]
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + powerdnssec_org['id'] + "?rrset_name=localhost.powerdnssec.org."))
-        self.assert_success_json(r)
-        data = r.json()
+        data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.")
         for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
             self.assertIn(k, data)
         self.assertEqual(sorted(data['rrsets'], key=operator.itemgetter('type')),
@@ -871,9 +873,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin):
         )
 
         # verify one RRset with one record from a name that has two, then filtered by type
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + powerdnssec_org['id'] + "?rrset_name=localhost.powerdnssec.org.&rrset_type=AAAA"))
-        self.assert_success_json(r)
-        data = r.json()
+        data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.", rrset_type="AAAA")
         for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
             self.assertIn(k, data)
         self.assertEqual(data['rrsets'],
@@ -1129,7 +1129,7 @@ $ORIGIN %NAME%
             'soa_edit': 'EPOCH'
         }
         self.put_zone(name, payload)
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         for k in payload.keys():
             self.assertIn(k, data)
             self.assertEqual(data[k], payload[k])
@@ -1141,7 +1141,7 @@ $ORIGIN %NAME%
             'soa_edit': ''
         }
         self.put_zone(name, payload)
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         for k in payload.keys():
             self.assertIn(k, data)
             self.assertEqual(data[k], payload[k])
@@ -1172,7 +1172,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # verify that (only) the new record is there
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertCountEqual(get_rrset(data, name, 'NS')['records'], rrset['records'])
 
     def test_zone_rr_update_mx(self):
@@ -1198,7 +1198,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # verify that (only) the new record is there
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset['records'])
 
     def test_zone_rr_update_invalid_mx(self):
@@ -1223,7 +1223,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assertEqual(r.status_code, 422)
         self.assertIn('non-hostname content', r.json()['error'])
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertIsNone(get_rrset(data, name, 'MX'))
 
     def test_zone_rr_update_opt(self):
@@ -1283,7 +1283,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # verify that all rrsets have been updated
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertEqual(get_rrset(data, name, 'NS')['records'], rrset1['records'])
         self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset2['records'])
 
@@ -1359,7 +1359,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # verify that the records are gone
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertIsNone(get_rrset(data, name, 'NS'))
 
     def test_zone_rr_update_rrset_combine_replace_and_delete(self):
@@ -1388,7 +1388,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # verify that (only) the new record is there
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertEqual(get_rrset(data, 'sub.' + name, 'CNAME')['records'], rrset2['records'])
 
     def test_zone_disable_reenable(self):
@@ -1414,7 +1414,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # check SOA serial has been edited
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
         self.assertNotEqual(soa_serial1, '1')
         # make sure domain is still in zone list (disabled SOA!)
@@ -1432,7 +1432,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # check SOA serial has been edited again
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
         self.assertNotEqual(soa_serial2, '1')
         self.assertNotEqual(soa_serial2, soa_serial1)
@@ -1650,7 +1650,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # verify that the new record is there
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertEqual(get_rrset(data, name, qtype)['records'], rrset['records'])
 
         rrset = {
@@ -1670,7 +1670,7 @@ $ORIGIN %NAME%
                                headers={'content-type': 'application/json'})
         self.assertEqual(r.status_code, 422)
         self.assertIn('only allowed at apex', r.json()['error'])
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertIsNone(get_rrset(data, 'sub.' + name, qtype))
 
     @parameterized.expand([
@@ -1695,7 +1695,7 @@ $ORIGIN %NAME%
                                headers={'content-type': 'application/json'})
         self.assertEqual(r.status_code, 422)
         self.assertIn('not allowed at apex', r.json()['error'])
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertIsNone(get_rrset(data, 'sub.' + name, qtype))
 
         rrset = {
@@ -1717,7 +1717,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # verify that the new record is there
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         self.assertEqual(get_rrset(data, 'sub.' + name, qtype)['records'], rrset['records'])
 
     def test_rr_svcb(self):
@@ -1885,7 +1885,7 @@ $ORIGIN %NAME%
             self.assert_success(r)
         # make sure the comments have been set, and that the NS
         # records are still present
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         serverset = get_rrset(data, name, 'NS')
         print(serverset)
         self.assertNotEqual(serverset['records'], [])
@@ -1911,7 +1911,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # make sure the NS records are still present
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         serverset = get_rrset(data, name, 'NS')
         print(serverset)
         self.assertNotEqual(serverset['records'], [])
@@ -1984,7 +1984,7 @@ $ORIGIN %NAME%
             headers={'content-type': 'application/json'})
         self.assert_success(r)
         # make sure the comments still exist
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+        data = self.get_zone(name)
         serverset = get_rrset(data, name, 'NS')
         print(serverset)
         self.assertEqual(serverset['records'], rrset2['records'])
@@ -2270,25 +2270,22 @@ $ORIGIN %NAME%
     def test_rrset_false_parameter(self):
         name = unique_zone_name()
         self.create_zone(name=name, kind='Native')
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
-        self.assert_success_json(r)
-        print(r.json())
-        self.assertEqual(r.json().get('rrsets'), None)
+        data = self.get_zone(name, rrsets="false")
+        self.assertEqual(data.get('rrsets'), None)
 
     def test_rrset_true_parameter(self):
         name = unique_zone_name()
         self.create_zone(name=name, kind='Native')
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
-        self.assert_success_json(r)
-        print(r.json())
-        self.assertEqual(len(r.json().get('rrsets')), 2)
+        data = self.get_zone(name, rrsets="true")
+        self.assertEqual(len(data['rrsets']), 2)
 
     def test_wrong_rrset_parameter(self):
         name = unique_zone_name()
         self.create_zone(name=name, kind='Native')
-        r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar"))
-        self.assertEqual(r.status_code, 422)
-        self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error'])
+        self.get_zone(
+            name, rrsets="foobar",
+            expect_error="'rrsets' request parameter value 'foobar' is not supported"
+        )
 
     def test_put_master_tsig_key_ids_non_existent(self):
         name = unique_zone_name()
@@ -2337,7 +2334,7 @@ class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
         # Also test that fetching the zone works.
         print("id:", data['id'])
         self.assertEqual(data['id'], '=2E')
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
+        data = self.get_zone(data['id'])
         print("zone (fetched):", data)
         for k in ('name', 'kind'):
             self.assertIn(k, data)
@@ -2355,7 +2352,7 @@ class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
             'soa_edit': 'EPOCH'
         }
         self.put_zone(zone_id, payload)
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
+        data = self.get_zone(zone_id)
         for k in payload.keys():
             self.assertIn(k, data)
             self.assertEqual(data[k], payload[k])
@@ -2366,7 +2363,7 @@ class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
             'soa_edit': ''
         }
         self.put_zone(zone_id, payload)
-        data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
+        data = self.get_zone(zone_id)
         for k in payload.keys():
             self.assertIn(k, data)
             self.assertEqual(data[k], payload[k])