+from __future__ import print_function
import json
import time
import unittest
def eq_zone_rrsets(rrsets, expected):
data_got = {}
data_expected = {}
- for type_, expected_records in expected.iteritems():
+ for type_, expected_records in expected.items():
type_ = str(type_)
data_got[type_] = set()
data_expected[type_] = set()
uses_name = any(['name' in expected_record for expected_record in expected_records])
# minify + convert received data
for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
- print rrset
+ print(rrset)
for r in rrset['records']:
data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
# minify expected data
for r in expected_records:
data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
- print "eq_zone_rrsets: got:"
+ print("eq_zone_rrsets: got:")
pprint(data_got)
- print "eq_zone_rrsets: expected:"
+ print("eq_zone_rrsets: expected:")
pprint(data_expected)
assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
del payload[k]
else:
payload[k] = v
- print "sending", payload
+ print("sending", payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
self.assert_success_json(r)
self.assertEquals(r.status_code, 201)
reply = r.json()
- print "reply", reply
+ print("reply", reply)
return name, payload, reply
if k in payload:
self.assertEquals(data[k], payload[k])
# generated EPOCH serial surely is > fixed serial we passed in
- print data
+ print(data)
self.assertGreater(data['serial'], payload['serial'])
soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
self.assertGreater(soa_serial, payload['serial'])
def test_create_zone_with_account(self):
# soa_edit_api wins over serial
name, payload, data = self.create_zone(account='anaccount', serial=10)
- print data
+ print(data)
for k in ('account', ):
self.assertIn(k, data)
if k in payload:
def test_create_zone_default_soa_edit_api(self):
name, payload, data = self.create_zone()
- print data
+ print(data)
self.assertEquals(data['soa_edit_api'], 'DEFAULT')
def test_create_zone_with_soa_edit(self):
name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
- print data
+ print(data)
self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT')
self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
'kind': 'Native',
'nameservers': ['uncanon.example.com']
}
- print payload
+ print(payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
'name': '',
'kind': 'Native',
}
- print payload
+ print(payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
'kind': 'Native',
'nameservers': ['ns1.example.com.']
}
- print payload
+ print(payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
'kind': 'Native',
'nameservers': ['ns1.example.com']
}
- print payload
+ print(payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
'nameservers': ['ns1.example.com.'],
'rrsets': [rrset],
}
- print payload
+ print(payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
'nameservers': ['ns1.example.com.'],
'rrsets': [rrset],
}
- print payload
+ print(payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
'kind': 'Native',
'nameservers': [{'a': 'ns1.example.com'}] # invalid
}
- print payload
+ print(payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
keys = r.json()
- print keys
+ print(keys)
self.assertEquals(r.status_code, 200)
self.assertEquals(len(keys), 1)
data = r.json()
- print data
+ print(data)
self.assertEquals(r.status_code, 200)
self.assertEquals(len(data['metadata']), 1)
data = r.json()
- print data
+ print(data)
self.assertEquals(r.status_code, 200)
self.assertEquals(len(data['metadata']), 1)
for k in ('name', 'masters', 'kind'):
self.assertIn(k, data)
self.assertEquals(data[k], payload[k])
- print "payload:", payload
- print "data:", data
+ print("payload:", payload)
+ print("data:", data)
# Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
zonelist = r.json()
- print "zonelist:", zonelist
+ print("zonelist:", zonelist)
self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
# Also test that fetching the zone works.
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
data = r.json()
- print "zone (fetched):", data
+ print("zone (fetched):", data)
for k in ('name', 'masters', 'kind'):
self.assertIn(k, data)
self.assertEquals(data[k], payload[k])
def test_retrieve_slave_zone(self):
name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
- print "payload:", payload
- print "data:", data
+ print("payload:", payload)
+ print("data:", data)
r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
data = r.json()
- print "status for axfr-retrieve:", data
+ print("status for axfr-retrieve:", data)
self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
'\' from master 127.0.0.2')
def test_notify_master_zone(self):
name, payload, data = self.create_zone(kind='Master')
- print "payload:", payload
- print "data:", data
+ print("payload:", payload)
+ print("data:", data)
r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
data = r.json()
- print "status for notify:", data
+ print("status for notify:", data)
self.assertEqual(data['result'], 'Notification queued')
def test_get_zone_with_symbols(self):
self.url("/api/v1/servers/localhost/zones/" + name),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- print r.content
+ print(r.content)
self.assert_success(r) # succeed so users can fix their wrong, old data
def test_zone_delete(self):
# records are still present
data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
serverset = get_rrset(data, name, 'NS')
- print serverset
+ print(serverset)
self.assertNotEquals(serverset['records'], [])
self.assertNotEquals(serverset['comments'], [])
# verify that modified_at has been set by pdns
# make sure the NS records are still present
data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
serverset = get_rrset(data, name, 'NS')
- print serverset
+ print(serverset)
self.assertNotEquals(serverset['records'], [])
self.assertEquals(serverset['comments'], [])
# make sure the comments still exist
data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
serverset = get_rrset(data, name, 'NS')
- print serverset
+ print(serverset)
self.assertEquals(serverset['records'], rrset2['records'])
self.assertEquals(serverset['comments'], rrset['comments'])
self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
- print revsets
+ print(revsets)
self.assertEquals(revsets, [{
u'name': u'44.4.2.192.in-addr.arpa.',
u'ttl': 3600,
self.assert_success(r)
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
- print revsets
+ print(revsets)
self.assertEquals(revsets, [{
u'name': u'2.0.2.192.in-addr.arpa.',
u'ttl': 3600,
self.assert_success(r)
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
- print revsets
+ print(revsets)
self.assertEquals(revsets, [{
u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
u'ttl': 3600,
self.create_zone(name=name, serial=22, soa_edit_api='')
r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
self.assert_success_json(r)
- print r.json()
+ print(r.json())
self.assertEquals(r.json(), [
{u'object_type': u'zone', u'name': name, u'zone_id': name},
{u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
])
def test_search_rr_substring(self):
- name = 'search-rr-zone.name.'
+ name = unique_zone_name()
+ search = name[5:-5]
self.create_zone(name=name)
- r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*rr-zone*"))
+ r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
self.assert_success_json(r)
- print r.json()
+ print(r.json())
# should return zone, SOA, ns1, ns2
self.assertEquals(len(r.json()), 4)
def test_search_rr_case_insensitive(self):
- name = 'search-rr-insenszone.name.'
+ name = unique_zone_name()+'testsuffix.'
self.create_zone(name=name)
- r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*rr-insensZONE*"))
+ r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
self.assert_success_json(r)
- print r.json()
+ print(r.json())
# should return zone, SOA, ns1, ns2
self.assertEquals(len(r.json()), 4)
def test_search_after_rectify_with_ent(self):
- name = 'search-rectified.name.'
+ name = unique_zone_name()
+ search = name.split('.')[0]
rrset = {
"name": 'sub.sub.' + name,
"type": "A",
}
self.create_zone(name=name, rrsets=[rrset])
pdnsutil_rectify(name)
- r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*search-rectified*"))
+ r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
self.assert_success_json(r)
- print r.json()
+ print(r.json())
# should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
self.assertEquals(len(r.json()), 5)
self.url("/api/v1/servers/localhost/zones?rrsets=false"),
data=json.dumps(payload),
headers={'content-type': 'application/json'})
- print r.json()
+ print(r.json())
self.assert_success_json(r)
self.assertEquals(r.status_code, 201)
self.assertEquals(r.json().get('rrsets'), None)
self.create_zone(name=name, kind='Native')
r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
self.assert_success_json(r)
- print r.json()
+ print(r.json())
self.assertEquals(r.json().get('rrsets'), None)
def test_rrset_true_parameter(self):
self.create_zone(name=name, kind='Native')
r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
self.assert_success_json(r)
- print r.json()
+ print(r.json())
self.assertEquals(len(r.json().get('rrsets')), 2)
def test_wrong_rrset_parameter(self):
)
# Regression test: verify zone list works
zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
- print "zonelist:", zonelist
+ print("zonelist:", zonelist)
self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
# Also test that fetching the zone works.
- print "id:", data['id']
+ print("id:", data['id'])
self.assertEquals(data['id'], '=2E')
data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
- print "zone (fetched):", data
+ print("zone (fetched):", data)
for k in ('name', 'kind'):
self.assertIn(k, data)
self.assertEquals(data[k], payload[k])
'servers': ['8.8.8.8'],
'recursion_desired': False,
}
- print payload
+ print(payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
self.create_zone(name=name, kind='Native')
r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
self.assert_success_json(r)
- print r.json()
+ print(r.json())
self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
def test_search_rr_substring(self):
self.create_zone(name=name, kind='Native')
r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
self.assert_success_json(r)
- print r.json()
+ print(r.json())
# should return zone, SOA
self.assertEquals(len(r.json()), 2)
import unittest
import os
-from test_helper import ApiTestCase, is_auth
+from test_helper import ApiTestCase, is_auth, pdnsutil, unique_zone_name
@unittest.skipIf(not is_auth(), "Not applicable")
class Cryptokeys(ApiTestCase):
- def __init__(self, *args, **kwds):
- super(Cryptokeys, self).__init__(*args, **kwds)
+ def setUp(self):
+ super(Cryptokeys, self).setUp()
self.keyid = 0
- self.zone = "cryptokeys.org"
+ self.zone = unique_zone_name()
+ self.zone_nodot = self.zone[:-1]
+ payload = {
+ 'name': self.zone,
+ 'kind': 'Native',
+ 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
+ }
+ r = self.session.post(
+ self.url("/api/v1/servers/localhost/zones"),
+ data=json.dumps(payload),
+ headers={'content-type': 'application/json'})
+ self.assert_success_json(r)
+ self.assertEquals(r.status_code, 201)
def tearDown(self):
- super(Cryptokeys,self).tearDown()
+ super(Cryptokeys, self).tearDown()
self.remove_zone_key(self.keyid)
# Adding a key to self.zone using the pdnsutil command
def add_zone_key(self, status='inactive'):
- try:
- return subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "add-zone-key", self.zone, "ksk", status], stderr=open(os.devnull, 'wb'))
- except subprocess.CalledProcessError as e:
- self.fail("pdnsutil add-zone-key failed: "+e.output)
+ return pdnsutil("add-zone-key", self.zone_nodot, "ksk", status)
# Removes a key from self.zone by id using the pdnsutil command
def remove_zone_key(self, key_id):
- try:
- subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "remove-zone-key", self.zone, str(key_id)])
- except subprocess.CalledProcessError as e:
- self.fail("pdnsutil remove-zone-key failed: "+e.output)
+ return pdnsutil("remove-zone-key", self.zone_nodot, str(key_id))
# This method tests the DELETE api call.
def test_delete(self):
#checks the status code. I don't know how to test explicit that the backend fail removing a key.
r = self.session.delete(self.url("/api/v1/servers/localhost/zones/"+self.zone+"/cryptokeys/"+self.keyid))
self.assertEquals(r.status_code, 200)
- self.assertEquals(r.content, "")
+ self.assertEquals(r.content, b"")
# Check that the key is actually deleted
- try:
- out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "list-keys", self.zone])
- self.assertNotIn(self.zone, out)
- except subprocess.CalledProcessError as e:
- self.fail("pdnsutil list-keys failed: " + e.output)
+ out = pdnsutil("list-keys", self.zone)
+ self.assertNotIn(self.zone, out)
def test_get_wrong_zone(self):
self.keyid = self.add_zone_key()
#checks for key is gone. Its ok even if no key had to be deleted. Or something went wrong with the backend.
r = self.session.delete(self.url("/api/v1/servers/localhost/zones/"+self.zone+"/cryptokeys/"+self.keyid))
self.assertEquals(r.status_code, 200)
- self.assertEquals(r.content, "")
+ self.assertEquals(r.content, b"")
# Prepares the json object for Post and sends it to the server
- def add_key(self, content='', type='ksk', active='true' , algo='', bits=0):
- if algo == '':
- payload = {
- 'keytype': type,
- 'active' : active
- }
- else:
- payload = {
- 'keytype': type,
- 'active' : active,
- 'algorithm' : algo
- }
- if bits > 0:
+ def add_key(self, content='', type='ksk', active='true', algo='', bits=None):
+ payload = {
+ 'keytype': type,
+ 'active': active,
+ }
+ if algo:
+ payload['algorithm'] = algo
+ if bits is not None:
payload['bits'] = bits
if content != '':
payload['content'] = content
+ print("create key with payload:", payload)
r = self.session.post(
self.url("/api/v1/servers/localhost/zones/"+self.zone+"/cryptokeys"),
data=json.dumps(payload),
return r
# Test POST for a positive result and delete the added key
- def post_helper(self,content='', algo='', bits=0):
+ def post_helper(self, content='', algo='', bits=None):
r = self.add_key(content=content, algo=algo, bits=bits)
self.assert_success_json(r)
self.assertEquals(r.status_code, 201)
self.assertEquals(response['keytype'], 'csk')
self.keyid = response['id']
# Check if the key is actually added
- try:
- out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "list-keys", self.zone])
- self.assertIn(self.zone, out)
- except subprocess.CalledProcessError as e:
- self.fail("pdnsutil list-keys failed: " + e.output)
+ out = pdnsutil("list-keys", self.zone_nodot)
+ self.assertIn(self.zone_nodot, out)
# Test POST to add a key with default algorithm
def test_post(self):
data=json.dumps(payload),
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 204)
- self.assertEquals(r.content, "")
+ self.assertEquals(r.content, b"")
# check if key is activated
- try:
- out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone])
- self.assertIn("Active", out)
- except subprocess.CalledProcessError as e:
- self.fail("pdnsutil show-zone failed: " + e.output)
+ out = pdnsutil("show-zone", self.zone_nodot)
+ self.assertIn("Active", out)
def test_put_deactivate_key(self):
- self.keyid= self.add_zone_key(status='active')
+ self.keyid = self.add_zone_key(status='active')
# deactivate key
payload2 = {
'active': False
data=json.dumps(payload2),
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 204)
- self.assertEquals(r.content, "")
+ self.assertEquals(r.content, b"")
# check if key is deactivated
- try:
- out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone])
- self.assertIn("Inactive", out)
- except subprocess.CalledProcessError as e:
- self.fail("pdnsutil show-zone failed: " + e.output)
+ out = pdnsutil("show-zone", self.zone_nodot)
+ self.assertIn("Inactive", out)
def test_put_deactivate_inactive_key(self):
self.keyid = self.add_zone_key()
data=json.dumps(payload),
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 204)
- self.assertEquals(r.content, "")
+ self.assertEquals(r.content, b"")
# check if key is still deactivated
- try:
- out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone])
- self.assertIn("Inactive", out)
- except subprocess.CalledProcessError as e:
- self.fail("pdnsutil show-zone failed: " + e.output)
+ out = pdnsutil("show-zone", self.zone_nodot)
+ self.assertIn("Inactive", out)
def test_put_activate_active_key(self):
self.keyid =self.add_zone_key(status='active')
data=json.dumps(payload2),
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 204)
- self.assertEquals(r.content, "")
+ self.assertEquals(r.content, b"")
# check if key is activated
- try:
- out = subprocess.check_output(["../pdns/pdnsutil", "--config-dir=.", "show-zone", self.zone])
- self.assertIn("Active", out)
- except subprocess.CalledProcessError as e:
- self.fail("pdnsutil show-zone failed: " + e.output)
+ out = pdnsutil("show-zone", self.zone_nodot)
+ self.assertIn("Active", out)