import json
import time
-import requests
import unittest
-from test_helper import ApiTestCase, unique_zone_name, isAuth, isRecursor
+from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor
class Zones(ApiTestCase):
- def test_ListZones(self):
+ def test_list_zones(self):
r = self.session.get(self.url("/servers/localhost/zones"))
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
domains = r.json()
example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
self.assertEquals(len(example_com), 1)
example_com = example_com[0]
required_fields = ['id', 'url', 'name', 'kind']
- if isAuth():
+ if is_auth():
required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial']
- elif isRecursor():
+ elif is_recursor():
required_fields = required_fields + ['recursion_desired', 'servers']
for field in required_fields:
self.assertIn(field, example_com)
-@unittest.skipIf(not isAuth(), "Not applicable")
+@unittest.skipIf(not is_auth(), "Not applicable")
class AuthZones(ApiTestCase):
def create_zone(self, name=None, **kwargs):
self.url("/servers/localhost/zones"),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
- return (payload, r.json())
+ self.assert_success_json(r)
+ return payload, r.json()
- def test_CreateZone(self):
+ def test_create_zone(self):
payload, data = self.create_zone(serial=22)
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api'):
self.assertIn(k, data)
# validate generated SOA
self.assertEquals(
[r['content'] for r in data['records'] if r['type'] == 'SOA'][0],
- "a.misconfigured.powerdns.server hostmaster."+payload['name']+" "+str(payload['serial'])+" 10800 3600 604800 3600"
+ "a.misconfigured.powerdns.server hostmaster." + payload['name'] + " " + str(payload['serial']) +
+ " 10800 3600 604800 3600"
)
- def test_CreateZoneWithSoaEditApi(self):
+ def test_create_zone_with_soa_edit_api(self):
# soa_edit_api wins over serial
payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
for k in ('soa_edit_api', ):
self.assertGreater(soa_serial, payload['serial'])
self.assertEquals(soa_serial, data['serial'])
- def test_CreateZoneWithRecords(self):
+ def test_create_zone_with_records(self):
name = unique_zone_name()
records = [
{
# check our record has appeared
self.assertEquals([r for r in data['records'] if r['type'] == records[0]['type']], records)
- def test_CreateZoneWithComments(self):
+ def test_create_zone_with_comments(self):
name = unique_zone_name()
comments = [
{
# check our comment has appeared
self.assertEquals(data['comments'], comments)
- def test_CreateZoneWithCustomSOA(self):
+ def test_create_zone_with_custom_soa(self):
name = unique_zone_name()
records = [
{
payload, data = self.create_zone(name=name, records=records)
self.assertEquals([r for r in data['records'] if r['type'] == records[0]['type']], records)
- def test_CreateZoneTrailingDot(self):
+ def test_create_zone_trailing_dot(self):
# Trailing dots should not end up in the zone name.
basename = unique_zone_name()
payload, data = self.create_zone(name=basename+'.')
self.assertEquals(data['name'], basename)
- def test_CreateZoneWithSymbols(self):
+ def test_create_zone_with_symbols(self):
payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
name = payload['name']
expected_id = (name.replace('/', '=2F')) + '.'
self.assertEquals(data[k], payload[k])
self.assertEquals(data['id'], expected_id)
- def test_CreateZoneWithNameserversNonString(self):
+ def test_create_zone_with_nameservers_non_string(self):
# ensure we don't crash
name = unique_zone_name()
payload = {
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 422)
- def test_GetZoneWithSymbols(self):
+ def test_get_zone_with_symbols(self):
payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
name = payload['name']
zone_id = (name.replace('/', '=2F')) + '.'
r = self.session.get(self.url("/servers/localhost/zones/" + zone_id))
+ data = r.json()
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
self.assertIn(k, data)
if k in payload:
self.assertEquals(data[k], payload[k])
- def test_GetZone(self):
+ def test_get_zone(self):
r = self.session.get(self.url("/servers/localhost/zones"))
domains = r.json()
example_com = [domain for domain in domains if domain['name'] == u'example.com'][0]
r = self.session.get(self.url("/servers/localhost/zones/" + example_com['id']))
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
data = r.json()
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
self.assertIn(k, data)
self.assertEquals(data['name'], 'example.com')
- def test_ExportZoneJson(self):
+ def test_export_zone_json(self):
payload, zone = self.create_zone(nameservers=['ns1.foo.com', 'ns2.foo.com'])
name = payload['name']
# export it
self.url("/servers/localhost/zones/" + name + "/export"),
headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
)
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
data = r.json()
self.assertIn('zone', data)
- expected_data = [name+'.\t3600\tNS\tns1.foo.com.',name+'.\t3600\tNS\tns2.foo.com.',name+'.\t3600\tSOA\ta.misconfigured.powerdns.server. hostmaster.'+name+'. 0 10800 3600 604800 3600']
+ expected_data = [name + '.\t3600\tNS\tns1.foo.com.',
+ name + '.\t3600\tNS\tns2.foo.com.',
+ name + '.\t3600\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
+ '. 0 10800 3600 604800 3600']
self.assertEquals(data['zone'].strip().split('\n'), expected_data)
- def test_ExportZoneText(self):
+ def test_export_zone_text(self):
payload, zone = self.create_zone(nameservers=['ns1.foo.com', 'ns2.foo.com'])
name = payload['name']
# export it
headers={'accept': '*/*'}
)
data = r.text.strip().split("\n")
- expected_data = [name+'.\t3600\tNS\tns1.foo.com.',name+'.\t3600\tNS\tns2.foo.com.',name+'.\t3600\tSOA\ta.misconfigured.powerdns.server. hostmaster.'+name+'. 0 10800 3600 604800 3600']
+ expected_data = [name + '.\t3600\tNS\tns1.foo.com.',
+ name + '.\t3600\tNS\tns2.foo.com.',
+ name + '.\t3600\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
+ '. 0 10800 3600 604800 3600']
self.assertEquals(data, expected_data)
- def test_UpdateZone(self):
+ def test_update_zone(self):
payload, zone = self.create_zone()
name = payload['name']
# update, set as Master and enable SOA-EDIT-API
payload = {
'kind': 'Master',
- 'masters': ['192.0.2.1','192.0.2.2'],
+ 'masters': ['192.0.2.1', '192.0.2.2'],
'soa_edit_api': 'EPOCH'
}
r = self.session.put(
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
data = r.json()
for k in payload.keys():
self.assertIn(k, data)
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
data = r.json()
for k in payload.keys():
self.assertIn(k, data)
self.assertEquals(data[k], payload[k])
- def test_ZoneRRUpdate(self):
+ def test_zone_rr_update(self):
payload, zone = self.create_zone()
name = payload['name']
# do a replace (= update)
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# verify that (only) the new record is there
r = self.session.get(self.url("/servers/localhost/zones/" + name))
data = r.json()['records']
recs = [rec for rec in data if rec['type'] == rrset['type'] and rec['name'] == rrset['name']]
self.assertEquals(recs, rrset['records'])
- def test_ZoneRRUpdateMX(self):
+ def test_zone_rr_update_mx(self):
# Important to test with MX records, as they have a priority field, which must not end up in the content field.
payload, zone = self.create_zone()
name = payload['name']
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# verify that (only) the new record is there
r = self.session.get(self.url("/servers/localhost/zones/" + name))
data = r.json()['records']
recs = [rec for rec in data if rec['type'] == rrset['type'] and rec['name'] == rrset['name']]
self.assertEquals(recs, rrset['records'])
- def test_ZoneRRUpdateMultipleRRsets(self):
+ def test_zone_rr_update_multiple_rrsets(self):
payload, zone = self.create_zone()
name = payload['name']
rrset1 = {
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# verify that all rrsets have been updated
r = self.session.get(self.url("/servers/localhost/zones/" + name))
data = r.json()['records']
recs2 = [rec for rec in data if rec['type'] == rrset2['type'] and rec['name'] == rrset2['name']]
self.assertEquals(recs2, rrset2['records'])
- def test_ZoneRRDelete(self):
+ def test_zone_rr_delete(self):
payload, zone = self.create_zone()
name = payload['name']
# do a delete of all NS records (these are created with the zone)
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# verify that the records are gone
r = self.session.get(self.url("/servers/localhost/zones/" + name))
data = r.json()['records']
recs = [rec for rec in data if rec['type'] == rrset['type'] and rec['name'] == rrset['name']]
self.assertEquals(recs, [])
- def test_ZoneDisableReenable(self):
+ def test_zone_disable_reenable(self):
# This also tests that SOA-EDIT-API works.
payload, zone = self.create_zone(soa_edit_api='EPOCH')
name = payload['name']
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# check SOA serial has been edited
print r.json()
soa_serial1 = [rec for rec in r.json()['records'] if rec['type'] == 'SOA'][0]['content'].split()[2]
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# check SOA serial has been edited again
print r.json()
soa_serial2 = [rec for rec in r.json()['records'] if rec['type'] == 'SOA'][0]['content'].split()[2]
self.assertNotEquals(soa_serial2, '1')
self.assertNotEquals(soa_serial2, soa_serial1)
- def test_ZoneRRUpdateQTypeMismatch(self):
+ def test_zone_rr_update_qtype_mismatch(self):
payload, zone = self.create_zone()
name = payload['name']
# replace with qtype mismatch
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 422)
- def test_ZoneRRUpdateQNameMismatch(self):
+ def test_zone_rr_update_qname_mismatch(self):
payload, zone = self.create_zone()
name = payload['name']
# replace with qname mismatch
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 422)
- def test_ZoneRRUpdateOutOfZone(self):
+ def test_zone_rr_update_out_of_zone(self):
payload, zone = self.create_zone()
name = payload['name']
# replace with qname mismatch
self.assertEquals(r.status_code, 422)
self.assertIn('out of zone', r.json()['error'])
- def test_ZoneRRDeleteOutOfZone(self):
+ def test_zone_rr_delete_out_of_zone(self):
payload, zone = self.create_zone()
name = payload['name']
# replace with qname mismatch
self.assertEquals(r.status_code, 422)
self.assertIn('out of zone', r.json()['error'])
- def test_ZoneCommentCreate(self):
+ def test_zone_comment_create(self):
payload, zone = self.create_zone()
name = payload['name']
rrset = {
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# make sure the comments have been set, and that the NS
# records are still present
r = self.session.get(self.url("/servers/localhost/zones/" + name))
# verify that modified_at has been set by pdns
self.assertNotEquals([c for c in data['comments']][0]['modified_at'], 0)
- def test_ZoneCommentDelete(self):
+ def test_zone_comment_delete(self):
# Test: Delete ONLY comments.
payload, zone = self.create_zone()
name = payload['name']
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# make sure the NS records are still present
r = self.session.get(self.url("/servers/localhost/zones/" + name))
data = r.json()
self.assertNotEquals([r for r in data['records'] if r['type'] == 'NS'], [])
self.assertEquals(data['comments'], [])
- def test_ZoneCommentStayIntact(self):
+ def test_zone_comment_stay_intact(self):
# Test if comments on an rrset stay intact if the rrset is replaced
payload, zone = self.create_zone()
name = payload['name']
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# replace rrset records
rrset2 = {
'changetype': 'replace',
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload2),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
# make sure the comments still exist
r = self.session.get(self.url("/servers/localhost/zones/" + name))
data = r.json()
self.assertEquals([r for r in data['records'] if r['type'] == 'NS'], rrset2['records'])
self.assertEquals(data['comments'], rrset['comments'])
- def test_ZoneAutoPtrIPv4(self):
+ def test_zone_auto_ptr_ipv4(self):
revzone = '0.2.192.in-addr.arpa'
self.create_zone(name=revzone)
payload, zone = self.create_zone()
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
r = self.session.get(self.url("/servers/localhost/zones/" + revzone))
recs = r.json()['records']
print recs
u'name': u'2.0.2.192.in-addr.arpa'
}])
- def test_ZoneAutoPtrIPv6(self):
+ def test_zone_auto_ptr_ipv6(self):
# 2001:DB8::bb:aa
revzone = '8.b.d.0.1.0.0.2.ip6.arpa'
self.create_zone(name=revzone)
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
r = self.session.get(self.url("/servers/localhost/zones/" + revzone))
recs = r.json()['records']
print recs
u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa'
}])
- def test_SearchRRExactZone(self):
+ def test_search_rr_exact_zone(self):
name = unique_zone_name()
self.create_zone(name=name)
r = self.session.get(self.url("/servers/localhost/search-data?q=" + name))
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
print r.json()
self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name+'.'}])
- def test_SearchRRSubstring(self):
+ def test_search_rr_substring(self):
name = 'search-rr-zone.name'
self.create_zone(name=name)
r = self.session.get(self.url("/servers/localhost/search-data?q=rr-zone"))
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
print r.json()
# should return zone, SOA, ns1, ns2
- self.assertEquals(len(r.json()), 1) # FIXME test disarmed for now (should be 4)
+ self.assertEquals(len(r.json()), 1) # FIXME test disarmed for now (should be 4)
- def test_SearchRRCaseInsensitive(self):
+ def test_search_rr_case_insensitive(self):
name = 'search-rr-insenszone.name'
self.create_zone(name=name)
r = self.session.get(self.url("/servers/localhost/search-data?q=rr-insensZONE"))
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
print r.json()
# should return zone, SOA, ns1, ns2
- self.assertEquals(len(r.json()), 1) # FIXME test disarmed for now (should be 4)
+ self.assertEquals(len(r.json()), 1) # FIXME test disarmed for now (should be 4)
-@unittest.skipIf(not isRecursor(), "Not applicable")
+@unittest.skipIf(not is_recursor(), "Not applicable")
class RecursorZones(ApiTestCase):
def create_zone(self, name=None, kind=None, rd=False, servers=None):
self.url("/servers/localhost/zones"),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
- return (payload, r.json())
+ self.assert_success_json(r)
+ return payload, r.json()
- def test_CreateAuthZone(self):
+ def test_create_auth_zone(self):
payload, data = self.create_zone(kind='Native')
# return values are normalized
payload['name'] += '.'
for k in payload.keys():
self.assertEquals(data[k], payload[k])
- def test_CreateForwardedZone(self):
+ def test_create_forwarded_zone(self):
payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
# return values are normalized
payload['servers'][0] += ':53'
for k in payload.keys():
self.assertEquals(data[k], payload[k])
- def test_CreateForwardedRDZone(self):
+ def test_create_forwarded_rd_zone(self):
payload, data = self.create_zone(name='google.com', kind='Forwarded', rd=True, servers=['8.8.8.8'])
# return values are normalized
payload['servers'][0] += ':53'
for k in payload.keys():
self.assertEquals(data[k], payload[k])
- def test_CreateAuthZoneWithSymbols(self):
+ def test_create_auth_zone_with_symbols(self):
payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
# return values are normalized
payload['name'] += '.'
self.assertEquals(data[k], payload[k])
self.assertEquals(data['id'], expected_id)
- def test_RenameAuthZone(self):
+ def test_rename_auth_zone(self):
payload, data = self.create_zone(kind='Native')
name = payload['name'] + '.'
# now rename it
self.url("/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
data = r.json()
for k in payload.keys():
self.assertEquals(data[k], payload[k])
- def test_SearchRRExactZone(self):
+ def test_search_rr_exact_zone(self):
name = unique_zone_name() + '.'
self.create_zone(name=name, kind='Native')
r = self.session.get(self.url("/servers/localhost/search-data?q=" + name))
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
print r.json()
self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
- def test_SearchRRSubstring(self):
+ def test_search_rr_substring(self):
name = 'search-rr-zone.name'
self.create_zone(name=name, kind='Native')
r = self.session.get(self.url("/servers/localhost/search-data?q=rr-zone"))
- self.assertSuccessJson(r)
+ self.assert_success_json(r)
print r.json()
# should return zone, SOA
self.assertEquals(len(r.json()), 2)