From c43b2d233f719fc27a350dc4f381b42eaf6f200e Mon Sep 17 00:00:00 2001 From: Chris Hofstaedtler Date: Wed, 16 Nov 2022 12:29:18 +0100 Subject: [PATCH] API tests: add get_zone helper --- regression-tests.api/test_Zones.py | 135 ++++++++++++++--------------- 1 file changed, 66 insertions(+), 69 deletions(-) diff --git a/regression-tests.api/test_Zones.py b/regression-tests.api/test_Zones.py index 7440a2db3f..2f5aa08815 100644 --- a/regression-tests.api/test_Zones.py +++ b/regression-tests.api/test_Zones.py @@ -108,6 +108,29 @@ class AuthZonesHelperMixin(object): print("reply", reply) return name, payload, reply + def get_zone(self, api_zone_id, expect_error=None, **kwargs): + print("GET zone", api_zone_id, "args:", kwargs) + r = self.session.get( + self.url("/api/v1/servers/localhost/zones/" + api_zone_id), + params=kwargs + ) + + reply = r.json() + print("reply", reply) + + if expect_error: + self.assertEqual(r.status_code, 422) + if expect_error is True: + pass + else: + self.assertIn(expect_error, r.json()['error']) + else: + # expect success + self.assert_success_json(r) + self.assertEqual(r.status_code, 200) + + return reply + def put_zone(self, api_zone_id, payload, expect_error=None): print("PUT zone", api_zone_id, "with:", payload) r = self.session.put( @@ -227,8 +250,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): self.url("/api/v1/servers/localhost/zones/" + data['id']), data=json.dumps(payload), headers={'content-type': 'application/json'}) - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])) - data = r.json() + data = self.get_zone(data['id']) soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2] self.assertEqual(soa_serial[-2:], '02') @@ -483,7 +505,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): name = unique_zone_name() name, payload, data = self.create_zone(dnssec=True) - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)) + self.get_zone(name) for k in ('dnssec', ): self.assertIn(k, data) @@ -511,11 +533,8 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): name, payload, data = self.create_zone(dnssec=True) self.put_zone(name, {'dnssec': False}) - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)) - - zoneinfo = r.json() - self.assertEqual(r.status_code, 200) + zoneinfo = self.get_zone(name) self.assertEqual(zoneinfo['dnssec'], False) r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys')) @@ -533,7 +552,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): nsec3param = '1 0 100 aabbccddeeff' name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param) - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)) + self.get_zone(name) for k in ('dnssec', 'nsec3param'): self.assertIn(k, data) @@ -560,7 +579,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param, nsec3narrow=True) - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)) + self.get_zone(name) for k in ('dnssec', 'nsec3param', 'nsec3narrow'): self.assertIn(k, data) @@ -585,11 +604,8 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): name, payload, data = self.create_zone(dnssec=True, nsec3param='1 0 1 ab') self.put_zone(name, {'nsec3param': ''}) - r = self.session.get( - self.url("/api/v1/servers/localhost/zones/" + name)) - data = r.json() - self.assertEqual(r.status_code, 200) + data = self.get_zone(name) self.assertEqual(data['nsec3param'], '') def test_create_zone_without_dnssec_unset_nsec3parm(self): @@ -617,25 +633,19 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): self.assertEqual(soa_serial[-2:], '01') self.put_zone(name, {'dnssec': True}) - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)) - data = r.json() + data = self.get_zone(name) soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2] - - self.assertEqual(r.status_code, 200) self.assertEqual(soa_serial[-2:], '02') self.put_zone(name, {'dnssec': False}) - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)) - data = r.json() + data = self.get_zone(name) soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2] - - self.assertEqual(r.status_code, 200) self.assertEqual(soa_serial[-2:], '03') def test_zone_absolute_url(self): - name, payload, data = self.create_zone() + self.create_zone() r = self.session.get(self.url("/api/v1/servers/localhost/zones")) rdata = r.json() print(rdata[0]) @@ -712,8 +722,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): print("zonelist:", zonelist) self.assertIn(payload['name'], [zone['name'] for zone in zonelist]) # Also test that fetching the zone works. - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])) - data = r.json() + data = self.get_zone(data['id']) print("zone (fetched):", data) for k in ('name', 'masters', 'kind'): self.assertIn(k, data) @@ -785,8 +794,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name()) name = payload['name'] zone_id = (name.replace('/', '=2F')) - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)) - data = r.json() + data = self.get_zone(zone_id) for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'): self.assertIn(k, data) if k in payload: @@ -796,9 +804,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): r = self.session.get(self.url("/api/v1/servers/localhost/zones")) domains = r.json() example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0] - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id'])) - self.assert_success_json(r) - data = r.json() + data = self.get_zone(example_com['id']) for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'): self.assertIn(k, data) self.assertEqual(data['name'], 'example.com.') @@ -809,9 +815,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0] # verify single record from name that has a single record - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id'] + "?rrset_name=host-18000.example.com.")) - self.assert_success_json(r) - data = r.json() + data = self.get_zone(example_com['id'], rrset_name="host-18000.example.com.") for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'): self.assertIn(k, data) self.assertEqual(data['rrsets'], @@ -834,9 +838,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): # verify two RRsets from a name that has two types with one record each powerdnssec_org = [domain for domain in domains if domain['name'] == u'powerdnssec.org.'][0] - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + powerdnssec_org['id'] + "?rrset_name=localhost.powerdnssec.org.")) - self.assert_success_json(r) - data = r.json() + data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.") for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'): self.assertIn(k, data) self.assertEqual(sorted(data['rrsets'], key=operator.itemgetter('type')), @@ -871,9 +873,7 @@ class AuthZones(ApiTestCase, AuthZonesHelperMixin): ) # verify one RRset with one record from a name that has two, then filtered by type - r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + powerdnssec_org['id'] + "?rrset_name=localhost.powerdnssec.org.&rrset_type=AAAA")) - self.assert_success_json(r) - data = r.json() + data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.", rrset_type="AAAA") for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'): self.assertIn(k, data) self.assertEqual(data['rrsets'], @@ -1129,7 +1129,7 @@ $ORIGIN %NAME% 'soa_edit': 'EPOCH' } self.put_zone(name, payload) - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) for k in payload.keys(): self.assertIn(k, data) self.assertEqual(data[k], payload[k]) @@ -1141,7 +1141,7 @@ $ORIGIN %NAME% 'soa_edit': '' } self.put_zone(name, payload) - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) for k in payload.keys(): self.assertIn(k, data) self.assertEqual(data[k], payload[k]) @@ -1172,7 +1172,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # verify that (only) the new record is there - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertCountEqual(get_rrset(data, name, 'NS')['records'], rrset['records']) def test_zone_rr_update_mx(self): @@ -1198,7 +1198,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # verify that (only) the new record is there - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset['records']) def test_zone_rr_update_invalid_mx(self): @@ -1223,7 +1223,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assertEqual(r.status_code, 422) self.assertIn('non-hostname content', r.json()['error']) - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertIsNone(get_rrset(data, name, 'MX')) def test_zone_rr_update_opt(self): @@ -1283,7 +1283,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # verify that all rrsets have been updated - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertEqual(get_rrset(data, name, 'NS')['records'], rrset1['records']) self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset2['records']) @@ -1359,7 +1359,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # verify that the records are gone - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertIsNone(get_rrset(data, name, 'NS')) def test_zone_rr_update_rrset_combine_replace_and_delete(self): @@ -1388,7 +1388,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # verify that (only) the new record is there - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertEqual(get_rrset(data, 'sub.' + name, 'CNAME')['records'], rrset2['records']) def test_zone_disable_reenable(self): @@ -1414,7 +1414,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # check SOA serial has been edited - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2] self.assertNotEqual(soa_serial1, '1') # make sure domain is still in zone list (disabled SOA!) @@ -1432,7 +1432,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # check SOA serial has been edited again - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2] self.assertNotEqual(soa_serial2, '1') self.assertNotEqual(soa_serial2, soa_serial1) @@ -1650,7 +1650,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # verify that the new record is there - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertEqual(get_rrset(data, name, qtype)['records'], rrset['records']) rrset = { @@ -1670,7 +1670,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assertEqual(r.status_code, 422) self.assertIn('only allowed at apex', r.json()['error']) - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertIsNone(get_rrset(data, 'sub.' + name, qtype)) @parameterized.expand([ @@ -1695,7 +1695,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assertEqual(r.status_code, 422) self.assertIn('not allowed at apex', r.json()['error']) - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertIsNone(get_rrset(data, 'sub.' + name, qtype)) rrset = { @@ -1717,7 +1717,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # verify that the new record is there - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) self.assertEqual(get_rrset(data, 'sub.' + name, qtype)['records'], rrset['records']) def test_rr_svcb(self): @@ -1885,7 +1885,7 @@ $ORIGIN %NAME% self.assert_success(r) # make sure the comments have been set, and that the NS # records are still present - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) serverset = get_rrset(data, name, 'NS') print(serverset) self.assertNotEqual(serverset['records'], []) @@ -1911,7 +1911,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # make sure the NS records are still present - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) serverset = get_rrset(data, name, 'NS') print(serverset) self.assertNotEqual(serverset['records'], []) @@ -1984,7 +1984,7 @@ $ORIGIN %NAME% headers={'content-type': 'application/json'}) self.assert_success(r) # make sure the comments still exist - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json() + data = self.get_zone(name) serverset = get_rrset(data, name, 'NS') print(serverset) self.assertEqual(serverset['records'], rrset2['records']) @@ -2270,25 +2270,22 @@ $ORIGIN %NAME% def test_rrset_false_parameter(self): name = unique_zone_name() self.create_zone(name=name, kind='Native') - r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false")) - self.assert_success_json(r) - print(r.json()) - self.assertEqual(r.json().get('rrsets'), None) + data = self.get_zone(name, rrsets="false") + self.assertEqual(data.get('rrsets'), None) def test_rrset_true_parameter(self): name = unique_zone_name() self.create_zone(name=name, kind='Native') - r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true")) - self.assert_success_json(r) - print(r.json()) - self.assertEqual(len(r.json().get('rrsets')), 2) + data = self.get_zone(name, rrsets="true") + self.assertEqual(len(data['rrsets']), 2) def test_wrong_rrset_parameter(self): name = unique_zone_name() self.create_zone(name=name, kind='Native') - r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar")) - self.assertEqual(r.status_code, 422) - self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error']) + self.get_zone( + name, rrsets="foobar", + expect_error="'rrsets' request parameter value 'foobar' is not supported" + ) def test_put_master_tsig_key_ids_non_existent(self): name = unique_zone_name() @@ -2337,7 +2334,7 @@ class AuthRootZone(ApiTestCase, AuthZonesHelperMixin): # Also test that fetching the zone works. print("id:", data['id']) self.assertEqual(data['id'], '=2E') - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json() + data = self.get_zone(data['id']) print("zone (fetched):", data) for k in ('name', 'kind'): self.assertIn(k, data) @@ -2355,7 +2352,7 @@ class AuthRootZone(ApiTestCase, AuthZonesHelperMixin): 'soa_edit': 'EPOCH' } self.put_zone(zone_id, payload) - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json() + data = self.get_zone(zone_id) for k in payload.keys(): self.assertIn(k, data) self.assertEqual(data[k], payload[k]) @@ -2366,7 +2363,7 @@ class AuthRootZone(ApiTestCase, AuthZonesHelperMixin): 'soa_edit': '' } self.put_zone(zone_id, payload) - data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json() + data = self.get_zone(zone_id) for k in payload.keys(): self.assertIn(k, data) self.assertEqual(data[k], payload[k]) -- 2.47.2