print("reply", reply)
return name, payload, reply
+ def get_zone(self, api_zone_id, expect_error=None, **kwargs):
+ print("GET zone", api_zone_id, "args:", kwargs)
+ r = self.session.get(
+ self.url("/api/v1/servers/localhost/zones/" + api_zone_id),
+ params=kwargs
+ )
+
+ reply = r.json()
+ print("reply", reply)
+
+ if expect_error:
+ self.assertEqual(r.status_code, 422)
+ if expect_error is True:
+ pass
+ else:
+ self.assertIn(expect_error, r.json()['error'])
+ else:
+ # expect success
+ self.assert_success_json(r)
+ self.assertEqual(r.status_code, 200)
+
+ return reply
+
def put_zone(self, api_zone_id, payload, expect_error=None):
print("PUT zone", api_zone_id, "with:", payload)
r = self.session.put(
self.url("/api/v1/servers/localhost/zones/" + data['id']),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
- data = r.json()
+ data = self.get_zone(data['id'])
soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
self.assertEqual(soa_serial[-2:], '02')
name = unique_zone_name()
name, payload, data = self.create_zone(dnssec=True)
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
+ self.get_zone(name)
for k in ('dnssec', ):
self.assertIn(k, data)
name, payload, data = self.create_zone(dnssec=True)
self.put_zone(name, {'dnssec': False})
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
-
- zoneinfo = r.json()
- self.assertEqual(r.status_code, 200)
+ zoneinfo = self.get_zone(name)
self.assertEqual(zoneinfo['dnssec'], False)
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
nsec3param = '1 0 100 aabbccddeeff'
name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
+ self.get_zone(name)
for k in ('dnssec', 'nsec3param'):
self.assertIn(k, data)
name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
nsec3narrow=True)
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
+ self.get_zone(name)
for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
self.assertIn(k, data)
name, payload, data = self.create_zone(dnssec=True,
nsec3param='1 0 1 ab')
self.put_zone(name, {'nsec3param': ''})
- r = self.session.get(
- self.url("/api/v1/servers/localhost/zones/" + name))
- data = r.json()
- self.assertEqual(r.status_code, 200)
+ data = self.get_zone(name)
self.assertEqual(data['nsec3param'], '')
def test_create_zone_without_dnssec_unset_nsec3parm(self):
self.assertEqual(soa_serial[-2:], '01')
self.put_zone(name, {'dnssec': True})
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
- data = r.json()
+ data = self.get_zone(name)
soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
-
- self.assertEqual(r.status_code, 200)
self.assertEqual(soa_serial[-2:], '02')
self.put_zone(name, {'dnssec': False})
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
- data = r.json()
+ data = self.get_zone(name)
soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
-
- self.assertEqual(r.status_code, 200)
self.assertEqual(soa_serial[-2:], '03')
def test_zone_absolute_url(self):
- name, payload, data = self.create_zone()
+ self.create_zone()
r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
rdata = r.json()
print(rdata[0])
print("zonelist:", zonelist)
self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
# Also test that fetching the zone works.
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
- data = r.json()
+ data = self.get_zone(data['id'])
print("zone (fetched):", data)
for k in ('name', 'masters', 'kind'):
self.assertIn(k, data)
name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
name = payload['name']
zone_id = (name.replace('/', '=2F'))
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
- data = r.json()
+ data = self.get_zone(zone_id)
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
self.assertIn(k, data)
if k in payload:
r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
domains = r.json()
example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id']))
- self.assert_success_json(r)
- data = r.json()
+ data = self.get_zone(example_com['id'])
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
self.assertIn(k, data)
self.assertEqual(data['name'], 'example.com.')
example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
# verify single record from name that has a single record
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id'] + "?rrset_name=host-18000.example.com."))
- self.assert_success_json(r)
- data = r.json()
+ data = self.get_zone(example_com['id'], rrset_name="host-18000.example.com.")
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
self.assertIn(k, data)
self.assertEqual(data['rrsets'],
# verify two RRsets from a name that has two types with one record each
powerdnssec_org = [domain for domain in domains if domain['name'] == u'powerdnssec.org.'][0]
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + powerdnssec_org['id'] + "?rrset_name=localhost.powerdnssec.org."))
- self.assert_success_json(r)
- data = r.json()
+ data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.")
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
self.assertIn(k, data)
self.assertEqual(sorted(data['rrsets'], key=operator.itemgetter('type')),
)
# verify one RRset with one record from a name that has two, then filtered by type
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + powerdnssec_org['id'] + "?rrset_name=localhost.powerdnssec.org.&rrset_type=AAAA"))
- self.assert_success_json(r)
- data = r.json()
+ data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.", rrset_type="AAAA")
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
self.assertIn(k, data)
self.assertEqual(data['rrsets'],
'soa_edit': 'EPOCH'
}
self.put_zone(name, payload)
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
for k in payload.keys():
self.assertIn(k, data)
self.assertEqual(data[k], payload[k])
'soa_edit': ''
}
self.put_zone(name, payload)
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
for k in payload.keys():
self.assertIn(k, data)
self.assertEqual(data[k], payload[k])
headers={'content-type': 'application/json'})
self.assert_success(r)
# verify that (only) the new record is there
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertCountEqual(get_rrset(data, name, 'NS')['records'], rrset['records'])
def test_zone_rr_update_mx(self):
headers={'content-type': 'application/json'})
self.assert_success(r)
# verify that (only) the new record is there
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset['records'])
def test_zone_rr_update_invalid_mx(self):
headers={'content-type': 'application/json'})
self.assertEqual(r.status_code, 422)
self.assertIn('non-hostname content', r.json()['error'])
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertIsNone(get_rrset(data, name, 'MX'))
def test_zone_rr_update_opt(self):
headers={'content-type': 'application/json'})
self.assert_success(r)
# verify that all rrsets have been updated
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertEqual(get_rrset(data, name, 'NS')['records'], rrset1['records'])
self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset2['records'])
headers={'content-type': 'application/json'})
self.assert_success(r)
# verify that the records are gone
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertIsNone(get_rrset(data, name, 'NS'))
def test_zone_rr_update_rrset_combine_replace_and_delete(self):
headers={'content-type': 'application/json'})
self.assert_success(r)
# verify that (only) the new record is there
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertEqual(get_rrset(data, 'sub.' + name, 'CNAME')['records'], rrset2['records'])
def test_zone_disable_reenable(self):
headers={'content-type': 'application/json'})
self.assert_success(r)
# check SOA serial has been edited
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
self.assertNotEqual(soa_serial1, '1')
# make sure domain is still in zone list (disabled SOA!)
headers={'content-type': 'application/json'})
self.assert_success(r)
# check SOA serial has been edited again
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
self.assertNotEqual(soa_serial2, '1')
self.assertNotEqual(soa_serial2, soa_serial1)
headers={'content-type': 'application/json'})
self.assert_success(r)
# verify that the new record is there
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertEqual(get_rrset(data, name, qtype)['records'], rrset['records'])
rrset = {
headers={'content-type': 'application/json'})
self.assertEqual(r.status_code, 422)
self.assertIn('only allowed at apex', r.json()['error'])
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertIsNone(get_rrset(data, 'sub.' + name, qtype))
@parameterized.expand([
headers={'content-type': 'application/json'})
self.assertEqual(r.status_code, 422)
self.assertIn('not allowed at apex', r.json()['error'])
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertIsNone(get_rrset(data, 'sub.' + name, qtype))
rrset = {
headers={'content-type': 'application/json'})
self.assert_success(r)
# verify that the new record is there
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
self.assertEqual(get_rrset(data, 'sub.' + name, qtype)['records'], rrset['records'])
def test_rr_svcb(self):
self.assert_success(r)
# make sure the comments have been set, and that the NS
# records are still present
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
serverset = get_rrset(data, name, 'NS')
print(serverset)
self.assertNotEqual(serverset['records'], [])
headers={'content-type': 'application/json'})
self.assert_success(r)
# make sure the NS records are still present
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
serverset = get_rrset(data, name, 'NS')
print(serverset)
self.assertNotEqual(serverset['records'], [])
headers={'content-type': 'application/json'})
self.assert_success(r)
# make sure the comments still exist
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
+ data = self.get_zone(name)
serverset = get_rrset(data, name, 'NS')
print(serverset)
self.assertEqual(serverset['records'], rrset2['records'])
def test_rrset_false_parameter(self):
name = unique_zone_name()
self.create_zone(name=name, kind='Native')
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
- self.assert_success_json(r)
- print(r.json())
- self.assertEqual(r.json().get('rrsets'), None)
+ data = self.get_zone(name, rrsets="false")
+ self.assertEqual(data.get('rrsets'), None)
def test_rrset_true_parameter(self):
name = unique_zone_name()
self.create_zone(name=name, kind='Native')
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
- self.assert_success_json(r)
- print(r.json())
- self.assertEqual(len(r.json().get('rrsets')), 2)
+ data = self.get_zone(name, rrsets="true")
+ self.assertEqual(len(data['rrsets']), 2)
def test_wrong_rrset_parameter(self):
name = unique_zone_name()
self.create_zone(name=name, kind='Native')
- r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar"))
- self.assertEqual(r.status_code, 422)
- self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error'])
+ self.get_zone(
+ name, rrsets="foobar",
+ expect_error="'rrsets' request parameter value 'foobar' is not supported"
+ )
def test_put_master_tsig_key_ids_non_existent(self):
name = unique_zone_name()
# Also test that fetching the zone works.
print("id:", data['id'])
self.assertEqual(data['id'], '=2E')
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
+ data = self.get_zone(data['id'])
print("zone (fetched):", data)
for k in ('name', 'kind'):
self.assertIn(k, data)
'soa_edit': 'EPOCH'
}
self.put_zone(zone_id, payload)
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
+ data = self.get_zone(zone_id)
for k in payload.keys():
self.assertIn(k, data)
self.assertEqual(data[k], payload[k])
'soa_edit': ''
}
self.put_zone(zone_id, payload)
- data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
+ data = self.get_zone(zone_id)
for k in payload.keys():
self.assertIn(k, data)
self.assertEqual(data[k], payload[k])