import time
import unittest
from copy import deepcopy
+from parameterized import parameterized
from pprint import pprint
from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, get_db_records, pdnsutil_rectify
self.assertEquals(data['name'], 'example.com.')
def test_import_zone_broken(self):
- payload = {}
+ payload = {
+ 'name': 'powerdns-broken.com',
+ 'kind': 'Master',
+ 'nameservers': [],
+ }
payload['zone'] = """
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
"""
- payload['name'] = 'powerdns-broken.com.'
- payload['kind'] = 'Master'
- payload['nameservers'] = []
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
def test_import_zone_axfr_outofzone(self):
# Ensure we don't create out-of-zone records
- name = unique_zone_name()
- payload = {}
+ payload = {
+ 'name': unique_zone_name(),
+ 'kind': 'Master',
+ 'nameservers': [],
+ }
payload['zone'] = """
-NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
-NAME 3600 IN NS powerdnssec2.ds9a.nl.
+%NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
+%NAME% 3600 IN NS powerdnssec2.ds9a.nl.
example.org. 3600 IN AAAA 2001:888:2000:1d::2
-NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
-""".replace('NAME', name)
- payload['name'] = name
- payload['kind'] = 'Master'
- payload['nameservers'] = []
+%NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
+""".replace('%NAME%', payload['name'])
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
def test_import_zone_axfr(self):
- payload = {}
+ payload = {
+ 'name': 'powerdns.com.',
+ 'kind': 'Master',
+ 'nameservers': [],
+ 'soa_edit_api': '', # turn off so exact SOA comparison works.
+ }
payload['zone'] = """
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
"""
- payload['name'] = 'powerdns.com.'
- payload['kind'] = 'Master'
- payload['nameservers'] = []
- payload['soa_edit_api'] = '' # turn off so exact SOA comparison works.
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
def test_import_zone_bind(self):
- payload = {}
+ payload = {
+ 'name': 'example.org.',
+ 'kind': 'Master',
+ 'nameservers': [],
+ 'soa_edit_api': '', # turn off so exact SOA comparison works.
+ }
payload['zone'] = """
$TTL 86400 ; 24 hours could have been written as 24h or 1d
; $TTL used for all RRs without explicit TTL value
bill IN A 192.168.0.3
fred IN A 192.168.0.4
"""
- payload['name'] = 'example.org.'
- payload['kind'] = 'Master'
- payload['nameservers'] = []
- payload['soa_edit_api'] = '' # turn off so exact SOA comparison works.
r = self.session.post(
self.url("/api/v1/servers/localhost/zones"),
data=json.dumps(payload),
eq_zone_rrsets(data['rrsets'], expected)
+ def test_import_zone_bind_cname_apex(self):
+ payload = {
+ 'name': unique_zone_name(),
+ 'kind': 'Master',
+ 'nameservers': [],
+ }
+ payload['zone'] = """
+$ORIGIN %NAME%
+@ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
+@ IN NS ns1.example.org.
+@ IN NS ns2.smokeyjoe.com.
+@ IN CNAME www.example.org.
+""".replace('%NAME%', payload['name'])
+ r = self.session.post(
+ self.url("/api/v1/servers/localhost/zones"),
+ data=json.dumps(payload),
+ headers={'content-type': 'application/json'})
+ self.assertEquals(r.status_code, 422)
+ self.assertIn('Conflicts with another RRset', r.json()['error'])
+
def test_export_zone_json(self):
name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
# export it
self.assertEquals(r.status_code, 422)
self.assertIn('unknown type', r.json()['error'])
- def test_rrset_cname_and_other(self):
+ @parameterized.expand([
+ ('CNAME', ),
+ ('DNAME', ),
+ ])
+ def test_rrset_exclusive_and_other(self, qtype):
name, payload, zone = self.create_zone()
rrset = {
'changetype': 'replace',
'name': name,
- 'type': 'CNAME',
+ 'type': qtype,
'ttl': 3600,
'records': [
{
r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 422)
- self.assertIn('Conflicts with pre-existing non-CNAME RRset', r.json()['error'])
+ self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
- def test_rrset_other_and_cname(self):
+ @parameterized.expand([
+ ('CNAME', ),
+ ('DNAME', ),
+ ])
+ def test_rrset_other_and_exclusive(self, qtype):
name, payload, zone = self.create_zone()
rrset = {
'changetype': 'replace',
'name': 'sub.'+name,
- 'type': 'CNAME',
+ 'type': qtype,
'ttl': 3600,
'records': [
{
r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 422)
- self.assertIn('Conflicts with pre-existing CNAME RRset', r.json()['error'])
-
- def test_rrset_multiple_cnames(self):
+ self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
+
+ @parameterized.expand([
+ ('SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
+ ('CNAME', ['01.example.org.', '02.example.org.']),
+ ('DNAME', ['01.example.org.', '02.example.org.']),
+ ])
+ def test_rrset_single_qtypes(self, qtype, contents):
name, payload, zone = self.create_zone()
rrset = {
'changetype': 'replace',
'name': 'sub.'+name,
- 'type': 'CNAME',
+ 'type': qtype,
'ttl': 3600,
'records': [
{
- "content": "01.example.org.",
+ "content": contents[0],
"disabled": False
},
{
- "content": "02.example.org.",
+ "content": contents[1],
"disabled": False
}
]
r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
headers={'content-type': 'application/json'})
self.assertEquals(r.status_code, 422)
- self.assertIn('/CNAME has more than one record', r.json()['error'])
+ self.assertIn('IN ' + qtype + ' has more than one record', r.json()['error'])
def test_create_zone_with_leading_space(self):
# Actual regression.
self.assertNotEquals(serverset['records'], [])
self.assertEquals(serverset['comments'], [])
+ def test_zone_comment_out_of_range_modified_at(self):
+ # Test if comments on an rrset stay intact if the rrset is replaced
+ name, payload, zone = self.create_zone()
+ rrset = {
+ 'changetype': 'replace',
+ 'name': name,
+ 'type': 'NS',
+ 'comments': [
+ {
+ 'account': 'test1',
+ 'content': 'oh hi there',
+ 'modified_at': '4294967297'
+ }
+ ]
+ }
+ payload = {'rrsets': [rrset]}
+ r = self.session.patch(
+ self.url("/api/v1/servers/localhost/zones/" + name),
+ data=json.dumps(payload),
+ headers={'content-type': 'application/json'})
+ self.assertEquals(r.status_code, 422)
+ self.assertIn("Value for key 'modified_at' is out of range", r.json()['error'])
+
def test_zone_comment_stay_intact(self):
# Test if comments on an rrset stay intact if the rrset is replaced
name, payload, zone = self.create_zone()
u'ttl': 3600, u'type': u'SOA', u'name': name},
])
+ def test_search_rr_exact_zone_filter_type_zone(self):
+ name = unique_zone_name()
+ data_type = "zone"
+ self.create_zone(name=name, serial=22, soa_edit_api='')
+ r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
+ self.assert_success_json(r)
+ print(r.json())
+ self.assertEquals(r.json(), [
+ {u'object_type': u'zone', u'name': name, u'zone_id': name},
+ ])
+
+ def test_search_rr_exact_zone_filter_type_record(self):
+ name = unique_zone_name()
+ data_type = "record"
+ self.create_zone(name=name, serial=22, soa_edit_api='')
+ r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
+ self.assert_success_json(r)
+ print(r.json())
+ self.assertEquals(r.json(), [
+ {u'content': u'ns1.example.com.',
+ u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
+ u'ttl': 3600, u'type': u'NS', u'name': name},
+ {u'content': u'ns2.example.com.',
+ u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
+ u'ttl': 3600, u'type': u'NS', u'name': name},
+ {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
+ u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
+ u'ttl': 3600, u'type': u'SOA', u'name': name},
+ ])
+
def test_search_rr_substring(self):
name = unique_zone_name()
search = name[5:-5]