di.backend->getDomainMetadataOne(zonename, "SOA-EDIT", soa_edit);
doc["soa_edit"] = soa_edit;
- // fill records
- DNSResourceRecord rr;
- Json::array records;
- di.backend->list(zonename, di.id, true); // incl. disabled
- while(di.backend->get(rr)) {
- if (!rr.qtype.getCode())
- continue; // skip empty non-terminals
+ vector<DNSResourceRecord> records;
+ vector<Comment> comments;
- records.push_back(Json::object {
- { "name", rr.qname.toString() },
- { "type", rr.qtype.getName() },
- { "ttl", (double)rr.ttl },
- { "disabled", rr.disabled },
- { "content", makeApiRecordContent(rr.qtype, rr.content) }
- });
+ // load all records + sort
+ {
+ DNSResourceRecord rr;
+ di.backend->list(zonename, di.id, true); // incl. disabled
+ while(di.backend->get(rr)) {
+ if (!rr.qtype.getCode())
+ continue; // skip empty non-terminals
+ records.push_back(rr);
+ }
+ sort(records.begin(), records.end(), [](const DNSResourceRecord& a, const DNSResourceRecord& b) {
+ if (a.qname == b.qname) {
+ return b.qtype < a.qtype;
+ }
+ return b.qname < a.qname;
+ });
}
- doc["records"] = records;
-
- // fill comments
- Comment comment;
- Json::array comments;
- di.backend->listComments(di.id);
- while(di.backend->getComment(comment)) {
- comments.push_back(Json::object {
- { "name", comment.qname.toString() },
- { "type", comment.qtype.getName() },
- { "modified_at", (double)comment.modified_at },
- { "account", comment.account },
- { "content", comment.content }
- });
+
+ // load all comments + sort
+ {
+ Comment comment;
+ di.backend->listComments(di.id);
+ while(di.backend->getComment(comment)) {
+ comments.push_back(comment);
+ }
+ sort(comments.begin(), comments.end(), [](const Comment& a, const Comment& b) {
+ if (a.qname == b.qname) {
+ return b.qtype < a.qtype;
+ }
+ return b.qname < a.qname;
+ });
}
- doc["comments"] = comments;
+
+ Json::array rrsets;
+ Json::object rrset;
+ Json::array rrset_records;
+ Json::array rrset_comments;
+ DNSName current_qname;
+ QType current_qtype;
+ uint32_t ttl;
+ auto rit = records.begin();
+ auto cit = comments.begin();
+
+ while (rit != records.end() || cit != comments.end()) {
+ if (cit == comments.end() || cit->qname.toString() < rit->qname.toString() || cit->qtype < rit->qtype) {
+ current_qname = rit->qname;
+ current_qtype = rit->qtype;
+ ttl = rit->ttl;
+ } else {
+ current_qname = cit->qname;
+ current_qtype = cit->qtype;
+ ttl = 0;
+ }
+
+ while(rit != records.end() && rit->qname == current_qname && rit->qtype == current_qtype) {
+ ttl = min(ttl, rit->ttl);
+ rrset_records.push_back(Json::object {
+ { "disabled", rit->disabled },
+ { "content", makeApiRecordContent(rit->qtype, rit->content) }
+ });
+ rit++;
+ }
+ while (cit != comments.end() && cit->qname == current_qname && cit->qtype == current_qtype) {
+ rrset_comments.push_back(Json::object {
+ { "modified_at", (double)cit->modified_at },
+ { "account", cit->account },
+ { "content", cit->content }
+ });
+ cit++;
+ }
+
+ rrset["name"] = current_qname.toString();
+ rrset["type"] = current_qtype.getName();
+ rrset["records"] = rrset_records;
+ rrset["comments"] = rrset_comments;
+ rrset["ttl"] = (double)ttl;
+ rrsets.push_back(rrset);
+ rrset.clear();
+ rrset_records.clear();
+ rrset_comments.clear();
+ }
+
+ doc["rrsets"] = rrsets;
resp->setBody(doc);
}
out["uptime"] = std::to_string(time(0) - s_starttime);
}
-static void gatherRecords(const Json container, vector<DNSResourceRecord>& new_records, vector<DNSResourceRecord>& new_ptrs) {
+static void gatherRecords(const Json container, const DNSName& qname, const QType qtype, const int ttl, vector<DNSResourceRecord>& new_records, vector<DNSResourceRecord>& new_ptrs) {
UeberBackend B;
DNSResourceRecord rr;
+ rr.qname = qname;
+ rr.qtype = qtype;
+ rr.auth = 1;
+ rr.ttl = ttl;
for(auto record : container["records"].array_items()) {
- rr.qname = apiNameToDNSName(stringFromJson(record, "name"));
- rr.qtype = stringFromJson(record, "type");
string content = stringFromJson(record, "content");
- rr.auth = 1;
- rr.ttl = intFromJson(record, "ttl");
rr.disabled = boolFromJson(record, "disabled");
- if (rr.qtype.getCode() == 0) {
- throw ApiException("Record "+rr.qname.toString()+"/"+stringFromJson(record, "type")+" is of unknown type");
- }
-
// validate that the client sent something we can actually parse, and require that data to be dotted.
try {
if (rr.qtype.getCode() != QType::AAAA) {
}
}
-static void gatherComments(const Json container, vector<Comment>& new_comments, bool use_name_type_from_container) {
+static void gatherComments(const Json container, const DNSName& qname, const QType qtype, vector<Comment>& new_comments) {
Comment c;
- if (use_name_type_from_container) {
- c.qname = stringFromJson(container, "name");
- c.qtype = stringFromJson(container, "type");
- }
+ c.qname = qname;
+ c.qtype = qtype;
time_t now = time(0);
for (auto comment : container["comments"].array_items()) {
- if (!use_name_type_from_container) {
- c.qname = stringFromJson(comment, "name");
- c.qtype = stringFromJson(comment, "type");
- }
c.modified_at = intFromJson(comment, "modified_at", now);
c.content = stringFromJson(comment, "content");
c.account = stringFromJson(comment, "account");
DNSName zonename = apiNameToDNSName(stringFromJson(document, "name"));
apiCheckNameAllowedCharacters(zonename.toString());
- string zonestring = document["zone"].string_value();
-
bool exists = B.getDomainInfo(zonename, di);
if(exists)
throw ApiException("Domain '"+zonename.toString()+"' already exists");
// validate 'kind' is set
DomainInfo::DomainKind zonekind = DomainInfo::stringToKind(stringFromJson(document, "kind"));
- auto records = document["records"];
- if (records.is_array() && zonestring != "")
- throw ApiException("You cannot give zonedata AND records");
+ string zonestring = document["zone"].string_value();
+ auto rrsets = document["rrsets"];
+ if (rrsets.is_array() && zonestring != "")
+ throw ApiException("You cannot give rrsets AND zone data as text");
auto nameservers = document["nameservers"];
if (!nameservers.is_array() && zonekind != DomainInfo::Slave)
vector<Comment> new_comments;
vector<DNSResourceRecord> new_ptrs;
- if (records.is_array()) {
- gatherRecords(document, new_records, new_ptrs);
+ if (rrsets.is_array()) {
+ for (const auto& rrset : rrsets.array_items()) {
+ DNSName qname = apiNameToDNSName(stringFromJson(rrset, "name"));
+ apiCheckQNameAllowedCharacters(qname.toString());
+ QType qtype;
+ qtype = stringFromJson(rrset, "type");
+ if (qtype.getCode() == 0) {
+ throw ApiException("RRset "+qname.toString()+" IN "+stringFromJson(rrset, "type")+": unknown type given");
+ }
+ if (rrset["records"].is_array()) {
+ int ttl = intFromJson(rrset, "ttl");
+ gatherRecords(rrset, qname, qtype, ttl, new_records, new_ptrs);
+ }
+ if (rrset["comments"].is_array()) {
+ gatherComments(rrset, qname, qtype, new_comments);
+ }
+ }
} else if (zonestring != "") {
gatherRecordsFromZone(zonestring, new_records, zonename);
}
- gatherComments(document, new_comments, false);
-
for(auto& rr : new_records) {
if (!rr.qname.isPartOf(zonename) && rr.qname != zonename)
throw ApiException("RRset "+rr.qname.toString()+" IN "+rr.qtype.getName()+": Name is out of zone");
di.backend->getDomainMetadataOne(zonename, "SOA-EDIT", soa_edit_kind);
bool soa_edit_done = false;
- for (auto rrset : rrsets.array_items()) {
- string changetype;
- QType qtype;
+ for (const auto& rrset : rrsets.array_items()) {
+ string changetype = toUpper(stringFromJson(rrset, "changetype"));
DNSName qname = apiNameToDNSName(stringFromJson(rrset, "name"));
apiCheckQNameAllowedCharacters(qname.toString());
+ QType qtype;
qtype = stringFromJson(rrset, "type");
- changetype = toUpper(stringFromJson(rrset, "changetype"));
+ if (qtype.getCode() == 0) {
+ throw ApiException("RRset "+qname.toString()+" IN "+stringFromJson(rrset, "type")+": unknown type given");
+ }
if (changetype == "DELETE") {
// delete all matching qname/qtype RRs (and, implictly comments).
if (!qname.isPartOf(zonename) && qname != zonename)
throw ApiException("RRset "+qname.toString()+" IN "+qtype.getName()+": Name is out of zone");
- new_records.clear();
- new_comments.clear();
- // new_ptrs is merged
- gatherRecords(rrset, new_records, new_ptrs);
- gatherComments(rrset, new_comments, true);
+ bool replace_records = rrset["records"].is_array();
+ bool replace_comments = rrset["comments"].is_array();
- for(DNSResourceRecord& rr : new_records) {
- rr.domain_id = di.id;
+ if (!replace_records && !replace_comments) {
+ throw ApiException("No change for RRset " + qname.toString() + " IN " + qtype.getName());
+ }
- if (rr.qname != qname || rr.qtype != qtype)
- throw ApiException("Record "+rr.qname.toString()+"/"+rr.qtype.getName()+" "+rr.content+": Record wrongly bundled with RRset " + qname.toString() + "/" + qtype.getName());
+ new_records.clear();
+ new_comments.clear();
- if (rr.qtype.getCode() == QType::SOA && rr.qname==zonename) {
- soa_edit_done = increaseSOARecord(rr, soa_edit_api_kind, soa_edit_kind);
- rr.content = makeBackendRecordContent(rr.qtype, rr.content);
+ if (replace_records) {
+ // ttl shouldn't be part of DELETE, and it shouldn't be required if we don't get new records.
+ int ttl = intFromJson(rrset, "ttl");
+ // new_ptrs is merged.
+ gatherRecords(rrset, qname, qtype, ttl, new_records, new_ptrs);
+
+ for(DNSResourceRecord& rr : new_records) {
+ rr.domain_id = di.id;
+ if (rr.qtype.getCode() == QType::SOA && rr.qname==zonename) {
+ soa_edit_done = increaseSOARecord(rr, soa_edit_api_kind, soa_edit_kind);
+ rr.content = makeBackendRecordContent(rr.qtype, rr.content);
+ }
}
}
- for(Comment& c : new_comments) {
- c.domain_id = di.id;
- }
-
- bool replace_records = rrset["records"].is_array();
- bool replace_comments = rrset["comments"].is_array();
+ if (replace_comments) {
+ gatherComments(rrset, qname, qtype, new_comments);
- if (!replace_records && !replace_comments) {
- throw ApiException("No change for RRset " + qname.toString() + "/" + qtype.getName());
+ for(Comment& c : new_comments) {
+ c.domain_id = di.id;
+ }
}
if (replace_records) {
import time
import unittest
from copy import deepcopy
-from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, eq_zone_dict, get_db_records
+from pprint import pprint
+from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, get_db_records
+
+
+def get_rrset(data, qname, qtype):
+ for rrset in data['rrsets']:
+ if rrset['name'] == qname and rrset['type'] == qtype:
+ return rrset
+ return None
+
+
+def get_first_rec(data, qname, qtype):
+ rrset = get_rrset(data, qname, qtype)
+ if rrset:
+ return rrset['records'][0]
+ return None
+
+
+def eq_zone_rrsets(rrsets, expected):
+ data_got = {}
+ data_expected = {}
+ for type_, expected_records in expected.iteritems():
+ type_ = str(type_)
+ data_got[type_] = set()
+ data_expected[type_] = set()
+ uses_name = any(['name' in expected_record for expected_record in expected_records])
+ # minify + convert received data
+ for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
+ print rrset
+ for r in rrset['records']:
+ data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
+ # minify expected data
+ for r in expected_records:
+ data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
+
+ print "eq_zone_rrsets: got:"
+ pprint(data_got)
+ print "eq_zone_rrsets: expected:"
+ pprint(data_expected)
+
+ assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
class Zones(ApiTestCase):
self.assertEquals(r.status_code, 201)
reply = r.json()
print "reply", reply
- return payload, reply
+ return name, payload, reply
@unittest.skipIf(not is_auth(), "Not applicable")
def test_create_zone(self):
# soa_edit_api has a default, override with empty for this test
- payload, data = self.create_zone(serial=22, soa_edit_api='')
+ name, payload, data = self.create_zone(serial=22, soa_edit_api='')
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
self.assertIn(k, data)
if k in payload:
self.assertEquals(data[k], payload[k])
- self.assertEquals(data['comments'], [])
# validate generated SOA
- expected_soa = "a.misconfigured.powerdns.server. hostmaster." + payload['name'] + " " + \
+ expected_soa = "a.misconfigured.powerdns.server. hostmaster." + name + " " + \
str(payload['serial']) + " 10800 3600 604800 3600"
self.assertEquals(
- [r['content'] for r in data['records'] if r['type'] == 'SOA'][0],
+ get_first_rec(data, name, 'SOA')['content'],
expected_soa
)
# Because we had confusion about dots, check that the DB is without dots.
- dbrecs = get_db_records(payload['name'], 'SOA')
+ dbrecs = get_db_records(name, 'SOA')
self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
def test_create_zone_with_soa_edit_api(self):
# soa_edit_api wins over serial
- payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
+ name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
for k in ('soa_edit_api', ):
self.assertIn(k, data)
if k in payload:
# generated EPOCH serial surely is > fixed serial we passed in
print data
self.assertGreater(data['serial'], payload['serial'])
- soa_serial = int([r['content'].split(' ')[2] for r in data['records'] if r['type'] == 'SOA'][0])
+ soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
self.assertGreater(soa_serial, payload['serial'])
self.assertEquals(soa_serial, data['serial'])
def test_create_zone_with_account(self):
# soa_edit_api wins over serial
- payload, data = self.create_zone(account='anaccount', serial=10)
+ name, payload, data = self.create_zone(account='anaccount', serial=10)
print data
for k in ('account', ):
self.assertIn(k, data)
def test_create_zone_with_records(self):
name = unique_zone_name()
- records = [
- {
- "name": name,
- "type": "A",
- "ttl": 3600,
+ rrset = {
+ "name": name,
+ "type": "A",
+ "ttl": 3600,
+ "records": [{
"content": "4.3.2.1",
- "disabled": False
- }
- ]
- payload, data = self.create_zone(name=name, records=records)
+ "disabled": False,
+ }],
+ }
+ name, payload, data = self.create_zone(name=name, rrsets=[rrset])
# check our record has appeared
- self.assertEquals([r for r in data['records'] if r['type'] == records[0]['type']], records)
+ self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
def test_create_zone_with_wildcard_records(self):
name = unique_zone_name()
- records = [
- {
- "name": "*."+name,
- "type": "A",
- "ttl": 3600,
+ rrset = {
+ "name": "*."+name,
+ "type": "A",
+ "ttl": 3600,
+ "records": [{
"content": "4.3.2.1",
- "disabled": False
- }
- ]
- payload, data = self.create_zone(name=name, records=records)
+ "disabled": False,
+ }],
+ }
+ name, payload, data = self.create_zone(name=name, rrsets=[rrset])
# check our record has appeared
- self.assertEquals([r for r in data['records'] if r['type'] == records[0]['type']], records)
-
+ self.assertEquals(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
def test_create_zone_with_comments(self):
name = unique_zone_name()
- comments = [
- {
- 'name': name,
- 'type': 'soa', # test uppercasing of type, too.
- 'account': 'test1',
- 'content': 'blah blah',
- 'modified_at': 11112,
- }
- ]
- payload, data = self.create_zone(name=name, comments=comments)
- comments[0]['type'] = comments[0]['type'].upper()
+ rrset = {
+ "name": name,
+ "type": "soa", # test uppercasing of type, too.
+ "comments": [{
+ "account": "test1",
+ "content": "blah blah",
+ "modified_at": 11112,
+ }],
+ }
+ name, payload, data = self.create_zone(name=name, rrsets=[rrset])
# check our comment has appeared
- self.assertEquals(data['comments'], comments)
+ self.assertEquals(get_rrset(data, name, 'SOA')['comments'], rrset['comments'])
def test_create_zone_uncanonical_nameservers(self):
name = unique_zone_name()
def test_create_zone_with_custom_soa(self):
name = unique_zone_name()
- records = [
- {
- u"name": name,
- u"type": u"soa", # test uppercasing of type, too.
- u"ttl": 3600,
- u"content": u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600",
- u"disabled": False
- }
- ]
- payload, data = self.create_zone(name=name, records=records, soa_edit_api='')
- records[0]['type'] = records[0]['type'].upper()
- self.assertEquals([r for r in data['records'] if r['type'] == records[0]['type']], records)
- dbrecs = get_db_records(name, records[0]['type'])
- self.assertEqual(dbrecs[0]['content'], records[0]['content'].replace('. ', ' '))
+ content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
+ rrset = {
+ "name": name,
+ "type": "soa", # test uppercasing of type, too.
+ "ttl": 3600,
+ "records": [{
+ "content": content,
+ "disabled": False,
+ }],
+ }
+ name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
+ self.assertEquals(get_rrset(data, name, 'SOA')['records'], rrset['records'])
+ dbrecs = get_db_records(name, 'SOA')
+ self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
def test_create_zone_double_dot(self):
name = 'test..' + unique_zone_name()
self.assertIn('contains unsupported characters', r.json()['error'])
def test_create_zone_with_symbols(self):
- payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
+ name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
name = payload['name']
expected_id = name.replace('/', '=2F')
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
def test_create_slave_zone(self):
# Test that nameservers can be absent for slave zones.
- payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
+ name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
for k in ('name', 'masters', 'kind'):
self.assertIn(k, data)
self.assertEquals(data[k], payload[k])
self.assertIn(k, data)
self.assertEquals(data[k], payload[k])
self.assertEqual(data['serial'], 0)
- self.assertEqual(data['records'], [])
+ self.assertEqual(data['rrsets'], [])
def test_delete_slave_zone(self):
- payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
+ name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
r.raise_for_status()
def test_retrieve_slave_zone(self):
- payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
+ name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
print "payload:", payload
print "data:", data
r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
'\' from master 127.0.0.2')
def test_notify_master_zone(self):
- payload, data = self.create_zone(kind='Master')
+ name, payload, data = self.create_zone(kind='Master')
print "payload:", payload
print "data:", data
r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
self.assertEqual(data['result'], 'Notification queued')
def test_get_zone_with_symbols(self):
- payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
+ name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
name = payload['name']
zone_id = (name.replace('/', '=2F'))
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
self.assert_success_json(r)
data = r.json()
self.assertIn('name', data)
- self.assertIn('records', data)
expected = {
'NS': [
- { 'content': 'powerdnssec1.ds9a.nl.' },
- { 'content': 'powerdnssec2.ds9a.nl.' } ],
+ {'content': 'powerdnssec1.ds9a.nl.'},
+ {'content': 'powerdnssec2.ds9a.nl.'},
+ ],
'SOA': [
- { 'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800' } ],
+ {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
+ ],
'MX': [
- { 'content': '0 xs.powerdns.com.' } ],
+ {'content': '0 xs.powerdns.com.'},
+ ],
'A': [
- { 'content': '82.94.213.34', 'name': 'powerdns.com.' } ],
+ {'content': '82.94.213.34', 'name': 'powerdns.com.'},
+ ],
'AAAA': [
- { 'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.' } ]
+ {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
+ ],
}
- eq_zone_dict(data['records'], expected)
+ eq_zone_rrsets(data['rrsets'], expected)
# noDot check
dbrecs = get_db_records(payload['name'], 'NS')
self.assert_success_json(r)
data = r.json()
self.assertIn('name', data)
- self.assertIn('records', data)
expected = {
'NS': [
- { 'content': 'ns1.example.org.' },
- { 'content': 'ns2.smokeyjoe.com.' } ],
+ {'content': 'ns1.example.org.'},
+ {'content': 'ns2.smokeyjoe.com.'},
+ ],
'SOA': [
- { 'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800' } ],
+ {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
+ ],
'MX': [
- { 'content': '10 mail.another.com.' } ],
+ {'content': '10 mail.another.com.'},
+ ],
'A': [
- { 'content': '192.168.0.1', 'name': 'ns1.example.org.' },
- { 'content': '192.168.0.2', 'name': 'www.example.org.' },
- { 'content': '192.168.0.3', 'name': 'bill.example.org.' },
- { 'content': '192.168.0.4', 'name': 'fred.example.org.' } ],
+ {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
+ {'content': '192.168.0.2', 'name': 'www.example.org.'},
+ {'content': '192.168.0.3', 'name': 'bill.example.org.'},
+ {'content': '192.168.0.4', 'name': 'fred.example.org.'},
+ ],
'CNAME': [
- { 'content': 'www.example.org.', 'name': 'ftp.example.org.' } ]
+ {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
+ ],
}
- eq_zone_dict(data['records'], expected)
+ eq_zone_rrsets(data['rrsets'], expected)
def test_export_zone_json(self):
- payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
- name = payload['name']
+ name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
# export it
r = self.session.get(
self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
self.assertEquals(data['zone'].strip().split('\n'), expected_data)
def test_export_zone_text(self):
- payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
- name = payload['name']
+ name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
# export it
r = self.session.get(
self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
self.assertEquals(data, expected_data)
def test_update_zone(self):
- payload, zone = self.create_zone()
+ name, payload, zone = self.create_zone()
name = payload['name']
# update, set as Master and enable SOA-EDIT-API
payload = {
self.assertEquals(data[k], payload[k])
def test_zone_rr_update(self):
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
# do a replace (= update)
rrset = {
'changetype': 'replace',
'name': name,
'type': 'ns',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "NS",
- "ttl": 3600,
"content": "ns1.bar.com.",
"disabled": False
},
{
- "name": name,
- "type": "NS",
- "ttl": 1800,
"content": "ns2-disabled.bar.com.",
"disabled": True
}
self.assert_success_json(r)
# verify that (only) the new record is there
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
- rrset['type'] = rrset['type'].upper()
- data = r.json()['records']
- recs = [rec for rec in data if rec['type'].upper() == rrset['type'].upper() and rec['name'].upper() == rrset['name'].upper()]
- self.assertEquals(recs, rrset['records'])
+ data = r.json()
+ self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset['records'])
def test_zone_rr_update_mx(self):
# Important to test with MX records, as they have a priority field, which must end up in the content field.
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
# do a replace (= update)
rrset = {
'changetype': 'replace',
'name': name,
'type': 'MX',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "MX",
- "ttl": 3600,
"content": "10 mail.example.org.",
"disabled": False
}
self.assert_success_json(r)
# verify that (only) the new record is there
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
- data = r.json()['records']
- recs = [rec for rec in data if rec['type'] == rrset['type'] and rec['name'] == rrset['name']]
- self.assertEquals(recs, rrset['records'])
+ data = r.json()
+ self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset['records'])
def test_zone_rr_update_multiple_rrsets(self):
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
rrset1 = {
'changetype': 'replace',
'name': name,
'type': 'NS',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "NS",
- "ttl": 3600,
+
"content": "ns9999.example.com.",
"disabled": False
}
'changetype': 'replace',
'name': name,
'type': 'MX',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "MX",
- "ttl": 3600,
"content": "10 mx444.example.com.",
"disabled": False
}
self.assert_success_json(r)
# verify that all rrsets have been updated
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
- data = r.json()['records']
- recs1 = [rec for rec in data if rec['type'] == rrset1['type'] and rec['name'] == rrset1['name']]
- self.assertEquals(recs1, rrset1['records'])
- recs2 = [rec for rec in data if rec['type'] == rrset2['type'] and rec['name'] == rrset2['name']]
- self.assertEquals(recs2, rrset2['records'])
+ data = r.json()
+ self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset1['records'])
+ self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset2['records'])
def test_zone_rr_delete(self):
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
# do a delete of all NS records (these are created with the zone)
rrset = {
'changetype': 'delete',
self.assert_success_json(r)
# verify that the records are gone
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
- data = r.json()['records']
- recs = [rec for rec in data if rec['type'] == rrset['type'] and rec['name'] == rrset['name']]
- self.assertEquals(recs, [])
+ data = r.json()
+ self.assertIsNone(get_rrset(data, name, 'NS'))
def test_zone_disable_reenable(self):
# This also tests that SOA-EDIT-API works.
- payload, zone = self.create_zone(soa_edit_api='EPOCH')
- name = payload['name']
+ name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
# disable zone by disabling SOA
rrset = {
'changetype': 'replace',
'name': name,
'type': 'SOA',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "SOA",
- "ttl": 3600,
"content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
"disabled": True
}
headers={'content-type': 'application/json'})
self.assert_success_json(r)
# check SOA serial has been edited
- print r.json()
- soa_serial1 = [rec for rec in r.json()['records'] if rec['type'] == 'SOA'][0]['content'].split()[2]
+ soa_serial1 = get_first_rec(r.json(), name, 'SOA')['content'].split()[2]
self.assertNotEquals(soa_serial1, '1')
# make sure domain is still in zone list (disabled SOA!)
r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
self.assert_success_json(r)
# check SOA serial has been edited again
print r.json()
- soa_serial2 = [rec for rec in r.json()['records'] if rec['type'] == 'SOA'][0]['content'].split()[2]
+ soa_serial2 = get_first_rec(r.json(), name, 'SOA')['content'].split()[2]
self.assertNotEquals(soa_serial2, '1')
self.assertNotEquals(soa_serial2, soa_serial1)
- def test_zone_rr_update_qtype_mismatch(self):
- payload, zone = self.create_zone()
- name = payload['name']
- # replace with qtype mismatch
- rrset = {
- 'changetype': 'replace',
- 'name': name,
- 'type': 'A',
- 'records': [
- {
- "name": name,
- "type": "NS",
- "ttl": 3600,
- "content": "ns1.bar.com.",
- "disabled": False
- }
- ]
- }
- payload = {'rrsets': [rrset]}
- r = self.session.patch(
- self.url("/api/v1/servers/localhost/zones/" + name),
- data=json.dumps(payload),
- headers={'content-type': 'application/json'})
- self.assertEquals(r.status_code, 422)
-
- def test_zone_rr_update_qname_mismatch(self):
- payload, zone = self.create_zone()
- name = payload['name']
- # replace with qname mismatch
- rrset = {
- 'changetype': 'replace',
- 'name': name,
- 'type': 'NS',
- 'records': [
- {
- "name": 'blah.'+name,
- "type": "NS",
- "ttl": 3600,
- "content": "ns1.bar.com.",
- "disabled": False
- }
- ]
- }
- payload = {'rrsets': [rrset]}
- r = self.session.patch(
- self.url("/api/v1/servers/localhost/zones/" + name),
- data=json.dumps(payload),
- headers={'content-type': 'application/json'})
- self.assertEquals(r.status_code, 422)
-
def test_zone_rr_update_out_of_zone(self):
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
# replace with qname mismatch
rrset = {
'changetype': 'replace',
'name': 'not-in-zone.',
'type': 'NS',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "NS",
- "ttl": 3600,
"content": "ns1.bar.com.",
"disabled": False
}
self.assertIn('out of zone', r.json()['error'])
def test_zone_rr_update_restricted_chars(self):
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
# replace with qname mismatch
rrset = {
'changetype': 'replace',
'name': 'test:' + name,
'type': 'NS',
+ 'ttl': 3600,
'records': [
{
- "name": 'test:' + name,
- "type": "NS",
- "ttl": 3600,
"content": "ns1.bar.com.",
"disabled": False
}
self.assertIn('contains unsupported characters', r.json()['error'])
def test_rrset_unknown_type(self):
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
rrset = {
'changetype': 'replace',
'name': name,
'type': 'FAFAFA',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "FAFAFA",
- "ttl": 3600,
"content": "4.3.2.1",
"disabled": False
}
def test_create_zone_with_leading_space(self):
# Actual regression.
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
rrset = {
'changetype': 'replace',
'name': name,
'type': 'A',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "A",
- "ttl": 3600,
"content": " 4.3.2.1",
"disabled": False
}
self.assertIn('Not in expected format', r.json()['error'])
def test_zone_rr_delete_out_of_zone(self):
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
rrset = {
'changetype': 'delete',
'name': 'not-in-zone.',
self.assertEquals(r.status_code, 200) # succeed so users can fix their wrong, old data
def test_zone_delete(self):
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
self.assertEquals(r.status_code, 204)
self.assertNotIn('Content-Type', r.headers)
def test_zone_comment_create(self):
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
rrset = {
'changetype': 'replace',
'name': name,
'type': 'NS',
+ 'ttl': 3600,
'comments': [
{
'account': 'test1',
# make sure the comments have been set, and that the NS
# records are still present
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
- data = r.json()
- print data
- self.assertNotEquals([r for r in data['records'] if r['type'] == 'NS'], [])
- self.assertNotEquals(data['comments'], [])
+ serverset = get_rrset(r.json(), name, 'NS')
+ print serverset
+ self.assertNotEquals(serverset['records'], [])
+ self.assertNotEquals(serverset['comments'], [])
# verify that modified_at has been set by pdns
- self.assertNotEquals([c for c in data['comments']][0]['modified_at'], 0)
+ self.assertNotEquals([c for c in serverset['comments']][0]['modified_at'], 0)
def test_zone_comment_delete(self):
# Test: Delete ONLY comments.
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
rrset = {
'changetype': 'replace',
'name': name,
self.assert_success_json(r)
# make sure the NS records are still present
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
- data = r.json()
- print data
- self.assertNotEquals([r for r in data['records'] if r['type'] == 'NS'], [])
- self.assertEquals(data['comments'], [])
+ serverset = get_rrset(r.json(), name, 'NS')
+ print serverset
+ self.assertNotEquals(serverset['records'], [])
+ self.assertEquals(serverset['comments'], [])
def test_zone_comment_stay_intact(self):
# Test if comments on an rrset stay intact if the rrset is replaced
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
# create a comment
rrset = {
'changetype': 'replace',
'changetype': 'replace',
'name': name,
'type': 'NS',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "NS",
- "ttl": 3600,
"content": "ns1.bar.com.",
"disabled": False
}
self.assert_success_json(r)
# make sure the comments still exist
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
- data = r.json()
- print data
- # fix up input data for comparison with assertEquals.
- # the fact that we're not sending name+type is part of the API spec.
- for c in rrset['comments']:
- c['name'] = rrset['name']
- c['type'] = rrset['type']
-
- self.assertEquals([r for r in data['records'] if r['type'] == 'NS'], rrset2['records'])
- self.assertEquals(data['comments'], rrset['comments'])
+ serverset = get_rrset(r.json(), name, 'NS')
+ print serverset
+ self.assertEquals(serverset['records'], rrset2['records'])
+ self.assertEquals(serverset['comments'], rrset['comments'])
def test_zone_auto_ptr_ipv4(self):
revzone = '0.2.192.in-addr.arpa.'
self.create_zone(name=revzone)
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
# replace with qname mismatch
rrset = {
'changetype': 'replace',
'name': name,
'type': 'A',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "A",
- "ttl": 3600,
"content": '192.2.0.2',
"disabled": False,
"set-ptr": True
headers={'content-type': 'application/json'})
self.assert_success_json(r)
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone))
- recs = r.json()['records']
- print recs
- revrec = [rec for rec in recs if rec['type'] == 'PTR']
- self.assertEquals(revrec, [{
- u'content': name,
- u'disabled': False,
+ revsets = [s for s in r.json()['rrsets'] if s['type'] == 'PTR']
+ print revsets
+ self.assertEquals(revsets, [{
+ u'name': u'2.0.2.192.in-addr.arpa.',
u'ttl': 3600,
u'type': u'PTR',
- u'name': u'2.0.2.192.in-addr.arpa.'
+ u'comments': [],
+ u'records': [{
+ u'content': name,
+ u'disabled': False,
+ }],
}])
def test_zone_auto_ptr_ipv6(self):
# 2001:DB8::bb:aa
revzone = '8.b.d.0.1.0.0.2.ip6.arpa.'
self.create_zone(name=revzone)
- payload, zone = self.create_zone()
- name = payload['name']
+ name, payload, zone = self.create_zone()
# replace with qname mismatch
rrset = {
'changetype': 'replace',
'name': name,
'type': 'AAAA',
+ 'ttl': 3600,
'records': [
{
- "name": name,
- "type": "AAAA",
- "ttl": 3600,
"content": '2001:DB8::bb:aa',
"disabled": False,
"set-ptr": True
headers={'content-type': 'application/json'})
self.assert_success_json(r)
r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone))
- recs = r.json()['records']
- print recs
- revrec = [rec for rec in recs if rec['type'] == 'PTR']
- self.assertEquals(revrec, [{
- u'content': name,
- u'disabled': False,
+ revsets = [s for s in r.json()['rrsets'] if s['type'] == 'PTR']
+ print revsets
+ self.assertEquals(revsets, [{
+ u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
u'ttl': 3600,
u'type': u'PTR',
- u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.'
+ u'comments': [],
+ u'records': [{
+ u'content': name,
+ u'disabled': False,
+ }],
}])
def test_search_rr_exact_zone(self):
self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
def test_create_zone(self):
- payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
+ name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
self.assertIn(k, data)
if k in payload:
self.assertEquals(data[k], payload[k])
- self.assertEquals(data['comments'], [])
# validate generated SOA
+ rec = get_first_rec(data, '.', 'SOA')
self.assertEquals(
- [r['content'] for r in data['records'] if r['type'] == 'SOA'][0],
+ rec['content'],
"a.misconfigured.powerdns.server. hostmaster. " + str(payload['serial']) +
" 10800 3600 604800 3600"
)
for k in ('name', 'kind'):
self.assertIn(k, data)
self.assertEquals(data[k], payload[k])
- self.assertEqual(data['records'][0]['name'], '.')
+ self.assertEqual(data['rrsets'][0]['name'], '.')
def test_update_zone(self):
- payload, zone = self.create_zone(name='.')
+ name, payload, zone = self.create_zone(name='.')
zone_id = '=2E'
# update, set as Master and enable SOA-EDIT-API
payload = {