]> git.ipfire.org Git - thirdparty/pdns.git/blame_incremental - regression-tests.api/test_Zones.py
Merge pull request #15791 from miodvallat/udon
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
... / ...
CommitLineData
1from __future__ import print_function
2import json
3import operator
4import time
5import unittest
6import requests.exceptions
7from copy import deepcopy
8from parameterized import parameterized
9from pprint import pprint
10from test_helper import ApiTestCase, unique_zone_name, is_auth, is_auth_lmdb, is_recursor, get_db_records, pdnsutil_rectify, sdig
11
12
13def get_rrset(data, qname, qtype):
14 for rrset in data['rrsets']:
15 if rrset['name'] == qname and rrset['type'] == qtype:
16 return rrset
17 return None
18
19
20def get_first_rec(data, qname, qtype):
21 rrset = get_rrset(data, qname, qtype)
22 if rrset:
23 return rrset['records'][0]
24 return None
25
26
27def eq_zone_rrsets(rrsets, expected):
28 data_got = {}
29 data_expected = {}
30 for type_, expected_records in expected.items():
31 type_ = str(type_)
32 data_got[type_] = set()
33 data_expected[type_] = set()
34 uses_name = any(['name' in expected_record for expected_record in expected_records])
35 # minify + convert received data
36 for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
37 print(rrset)
38 for r in rrset['records']:
39 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
40 # minify expected data
41 for r in expected_records:
42 data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
43
44 print("eq_zone_rrsets: got:")
45 pprint(data_got)
46 print("eq_zone_rrsets: expected:")
47 pprint(data_expected)
48
49 assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
50
51
52def assert_eq_rrsets(rrsets, expected):
53 """Assert rrsets sets are equal, ignoring sort order."""
54 key = lambda rrset: (rrset['name'], rrset['type'])
55 assert sorted(rrsets, key=key) == sorted(expected, key=key)
56
57
58def templated_rrsets(rrsets: list, zonename: str):
59 """
60 Replace $NAME$ in `name` and `content` of given rrsets with `zonename`.
61 Will return a copy. Original rrsets should stay unmodified.
62 """
63 new_rrsets = []
64 for rrset in rrsets:
65 new_rrset = rrset | {"name": rrset["name"].replace('$NAME$', zonename)}
66
67 if "records" in rrset:
68 records = []
69 for record in rrset["records"]:
70 records.append(record | {"content": record["content"].replace('$NAME$', zonename)})
71 new_rrset["records"] = records
72
73 new_rrsets.append(new_rrset)
74
75 return new_rrsets
76
77
78class Zones(ApiTestCase):
79
80 def _test_list_zones(self, dnssec=True):
81 path = "/api/v1/servers/localhost/zones"
82 if not dnssec:
83 path = path + "?dnssec=false"
84 r = self.session.get(self.url(path))
85 self.assert_success_json(r)
86 domains = r.json()
87 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
88 self.assertEqual(len(example_com), 1)
89 example_com = example_com[0]
90 print(example_com)
91 required_fields = ['id', 'url', 'name', 'kind']
92 if is_auth():
93 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial', 'account', 'catalog']
94 if dnssec:
95 required_fields = required_fields = ['dnssec', 'edited_serial']
96 self.assertNotEqual(example_com['serial'], 0)
97 if not dnssec:
98 self.assertNotIn('dnssec', example_com)
99 elif is_recursor():
100 required_fields = required_fields + ['recursion_desired', 'servers']
101 for field in required_fields:
102 self.assertIn(field, example_com)
103
104 def test_list_zones_with_dnssec(self):
105 if is_auth():
106 self._test_list_zones(True)
107
108 def test_list_zones_without_dnssec(self):
109 self._test_list_zones(False)
110
111
112class AuthZonesHelperMixin(object):
113 def create_zone(self, name=None, expect_error=None, **kwargs):
114 if name is None:
115 name = unique_zone_name()
116 payload = {
117 "name": name,
118 "kind": "Native",
119 "nameservers": ["ns1.example.com.", "ns2.example.com."]
120 }
121 for k, v in kwargs.items():
122 if v is None:
123 del payload[k]
124 else:
125 payload[k] = v
126 if "zone" in payload:
127 payload["zone"] = payload["zone"].replace("$NAME$", payload["name"])
128 if "rrsets" in payload:
129 payload["rrsets"] = templated_rrsets(payload["rrsets"], payload["name"])
130
131 print("Create zone", name, "with:", payload)
132 r = self.session.post(
133 self.url("/api/v1/servers/localhost/zones"),
134 data=json.dumps(payload),
135 headers={"content-type": "application/json"})
136
137 if expect_error:
138 self.assertEqual(r.status_code, 422, r.content)
139 reply = r.json()
140 if expect_error is True:
141 pass
142 else:
143 self.assertIn(expect_error, reply["error"])
144 else:
145 # expect success
146 self.assertEqual(r.status_code, 201, r.content)
147 reply = r.json()
148
149 return name, payload, reply
150
151 def get_zone(self, api_zone_id, expect_error=None, **kwargs):
152 print("GET zone", api_zone_id, "args:", kwargs)
153 r = self.session.get(
154 self.url("/api/v1/servers/localhost/zones/" + api_zone_id),
155 params=kwargs
156 )
157
158 reply = r.json()
159 print("reply", reply)
160
161 if expect_error:
162 self.assertEqual(r.status_code, 422)
163 if expect_error is True:
164 pass
165 else:
166 self.assertIn(expect_error, r.json()['error'])
167 else:
168 # expect success
169 self.assert_success_json(r)
170 self.assertEqual(r.status_code, 200)
171
172 return reply
173
174 def put_zone(self, api_zone_id, payload, expect_error=None):
175 print("PUT zone", api_zone_id, "with:", payload)
176 r = self.session.put(
177 self.url("/api/v1/servers/localhost/zones/" + api_zone_id),
178 data=json.dumps(payload),
179 headers={'content-type': 'application/json'})
180
181 print("reply status code:", r.status_code)
182 if expect_error:
183 self.assertEqual(r.status_code, 422, r.content)
184 reply = r.json()
185 if expect_error is True:
186 pass
187 else:
188 self.assertIn(expect_error, reply['error'])
189 else:
190 # expect success (no content)
191 self.assertEqual(r.status_code, 204, r.content)
192
193@unittest.skipIf(not is_auth(), "Not applicable")
194class AuthZones(ApiTestCase, AuthZonesHelperMixin):
195
196 def test_create_zone(self):
197 # soa_edit_api has a default, override with empty for this test
198 name, payload, data = self.create_zone(serial=22, soa_edit_api='')
199 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'edited_serial', 'soa_edit_api', 'soa_edit', 'account'):
200 self.assertIn(k, data)
201 if k in payload:
202 self.assertEqual(data[k], payload[k])
203 # validate generated SOA
204 expected_soa = "a.misconfigured.dns.server.invalid. hostmaster." + name + " " + \
205 str(payload['serial']) + " 10800 3600 604800 3600"
206 self.assertEqual(
207 get_first_rec(data, name, 'SOA')['content'],
208 expected_soa
209 )
210
211 if not is_auth_lmdb():
212 # Because we had confusion about dots, check that the DB is without dots.
213 dbrecs = get_db_records(name, 'SOA')
214 self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
215 self.assertNotEqual(data['serial'], data['edited_serial'])
216
217 def test_create_zone_with_soa_edit_api(self):
218 # soa_edit_api wins over serial
219 name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
220 for k in ('soa_edit_api', ):
221 self.assertIn(k, data)
222 if k in payload:
223 self.assertEqual(data[k], payload[k])
224 # generated EPOCH serial surely is > fixed serial we passed in
225 print(data)
226 self.assertGreater(data['serial'], payload['serial'])
227 soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
228 self.assertGreater(soa_serial, payload['serial'])
229 self.assertEqual(soa_serial, data['serial'])
230
231 def test_create_zone_with_catalog(self):
232 # soa_edit_api wins over serial
233 name, payload, data = self.create_zone(catalog='catalog.invalid.', serial=10)
234 print(data)
235 for k in ('catalog', ):
236 self.assertIn(k, data)
237 if k in payload:
238 self.assertEqual(data[k], payload[k])
239
240 # check that the catalog is reflected in the /zones output (#13633)
241 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
242 self.assert_success_json(r)
243 domains = r.json()
244 domain = [domain for domain in domains if domain['name'] == name]
245 self.assertEqual(len(domain), 1)
246 domain = domain[0]
247 self.assertEqual(domain["catalog"], "catalog.invalid.")
248
249 def test_create_zone_with_account(self):
250 # soa_edit_api wins over serial
251 name, payload, data = self.create_zone(account='anaccount', serial=10, kind='Master')
252 print(data)
253 for k in ('account', ):
254 self.assertIn(k, data)
255 if k in payload:
256 self.assertEqual(data[k], payload[k])
257
258 # as we did not set a catalog in our request, check that the default catalog was applied
259 self.assertEqual(data['catalog'], "default-catalog.example.com.")
260
261 def test_create_zone_default_soa_edit_api(self):
262 name, payload, data = self.create_zone()
263 print(data)
264 self.assertEqual(data['soa_edit_api'], 'DEFAULT')
265
266 def test_create_zone_exists(self):
267 name, payload, data = self.create_zone()
268 print(data)
269 payload = {
270 'name': name,
271 'kind': 'Native'
272 }
273 print(payload)
274 r = self.session.post(
275 self.url("/api/v1/servers/localhost/zones"),
276 data=json.dumps(payload),
277 headers={'content-type': 'application/json'})
278 self.assertEqual(r.status_code, 409) # Conflict - already exists
279
280 def test_create_zone_with_soa_edit(self):
281 name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
282 print(data)
283 self.assertEqual(data['soa_edit'], 'INCEPTION-INCREMENT')
284 self.assertEqual(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
285 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
286 # These particular settings lead to the first serial set to YYYYMMDD01.
287 self.assertEqual(soa_serial[-2:], '01')
288 rrset = {
289 'changetype': 'replace',
290 'name': name,
291 'type': 'A',
292 'ttl': 3600,
293 'records': [
294 {
295 "content": "127.0.0.1",
296 "disabled": False
297 }
298 ]
299 }
300 payload = {'rrsets': [rrset]}
301 r = self.session.patch(
302 self.url("/api/v1/servers/localhost/zones/" + data['id']),
303 data=json.dumps(payload),
304 headers={'content-type': 'application/json'})
305 data = self.get_zone(data['id'])
306 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
307 self.assertEqual(soa_serial[-2:], '02')
308 self.assertEqual(r.headers['X-PDNS-Old-Serial'][-2:], '01')
309 self.assertEqual(r.headers['X-PDNS-New-Serial'][-2:], '02')
310
311 def test_create_zone_with_records(self):
312 name = unique_zone_name()
313 rrset = {
314 "name": name,
315 "type": "A",
316 "ttl": 3600,
317 "records": [{
318 "content": "4.3.2.1",
319 "disabled": False,
320 }],
321 }
322 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
323 # check our record has appeared
324 self.assertEqual(get_rrset(data, name, 'A')['records'], rrset['records'])
325
326 def test_create_zone_with_wildcard_records(self):
327 name = unique_zone_name()
328 rrset = {
329 "name": "*."+name,
330 "type": "A",
331 "ttl": 3600,
332 "records": [{
333 "content": "4.3.2.1",
334 "disabled": False,
335 }],
336 }
337 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
338 # check our record has appeared
339 self.assertEqual(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
340
341 def test_create_zone_with_comments(self):
342 name = unique_zone_name()
343 rrsets = [
344 {
345 "name": name,
346 "type": "soa", # test uppercasing of type, too.
347 "comments": [{
348 "account": "test1",
349 "content": "blah blah and test a few non-ASCII chars: ö, €",
350 "modified_at": 11112,
351 }],
352 },
353 {
354 "name": name,
355 "type": "AAAA",
356 "ttl": 3600,
357 "records": [{
358 "content": "2001:DB8::1",
359 "disabled": False,
360 }],
361 "comments": [{
362 "account": "test AAAA",
363 "content": "blah blah AAAA",
364 "modified_at": 11112,
365 }],
366 },
367 {
368 "name": name,
369 "type": "TXT",
370 "ttl": 3600,
371 "records": [{
372 "content": "\"test TXT\"",
373 "disabled": False,
374 }],
375 },
376 {
377 "name": name,
378 "type": "A",
379 "ttl": 3600,
380 "records": [{
381 "content": "192.0.2.1",
382 "disabled": False,
383 }],
384 },
385 ]
386
387 if is_auth_lmdb():
388 # No comments in LMDB
389 self.create_zone(name=name, rrsets=rrsets, expect_error="Hosting backend does not support editing comments.")
390 return
391
392 name, _, data = self.create_zone(name=name, rrsets=rrsets)
393 # NS records have been created
394 self.assertEqual(len(data['rrsets']), len(rrsets) + 1)
395 # check our comment has appeared
396 self.assertEqual(get_rrset(data, name, 'SOA')['comments'], rrsets[0]['comments'])
397 self.assertEqual(get_rrset(data, name, 'A')['comments'], [])
398 self.assertEqual(get_rrset(data, name, 'TXT')['comments'], [])
399 self.assertEqual(get_rrset(data, name, 'AAAA')['comments'], rrsets[1]['comments'])
400
401 def test_create_zone_uncanonical_nameservers(self):
402 name = unique_zone_name()
403 payload = {
404 'name': name,
405 'kind': 'Native',
406 'nameservers': ['uncanon.example.com']
407 }
408 print(payload)
409 r = self.session.post(
410 self.url("/api/v1/servers/localhost/zones"),
411 data=json.dumps(payload),
412 headers={'content-type': 'application/json'})
413 self.assertEqual(r.status_code, 422)
414 self.assertIn('Nameserver is not canonical', r.json()['error'])
415
416 def test_create_auth_zone_no_name(self):
417 name = unique_zone_name()
418 payload = {
419 'name': '',
420 'kind': 'Native',
421 }
422 print(payload)
423 r = self.session.post(
424 self.url("/api/v1/servers/localhost/zones"),
425 data=json.dumps(payload),
426 headers={'content-type': 'application/json'})
427 self.assertEqual(r.status_code, 422)
428 self.assertIn('is not canonical', r.json()['error'])
429
430 def test_create_zone_with_custom_soa(self):
431 name = unique_zone_name()
432 content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
433 rrset = {
434 "name": name,
435 "type": "soa", # test uppercasing of type, too.
436 "ttl": 3600,
437 "records": [{
438 "content": content,
439 "disabled": False,
440 }],
441 }
442 name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
443 self.assertEqual(get_rrset(data, name, 'SOA')['records'], rrset['records'])
444 if not is_auth_lmdb():
445 dbrecs = get_db_records(name, 'SOA')
446 self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
447
448 def test_create_zone_double_dot(self):
449 name = 'test..' + unique_zone_name()
450 payload = {
451 'name': name,
452 'kind': 'Native',
453 'nameservers': ['ns1.example.com.']
454 }
455 print(payload)
456 r = self.session.post(
457 self.url("/api/v1/servers/localhost/zones"),
458 data=json.dumps(payload),
459 headers={'content-type': 'application/json'})
460 self.assertEqual(r.status_code, 422)
461 self.assertIn('Unable to parse Zone Name', r.json()['error'])
462
463 def test_create_zone_restricted_chars(self):
464 name = 'test:' + unique_zone_name() # : isn't good as a name.
465 payload = {
466 'name': name,
467 'kind': 'Native',
468 'nameservers': ['ns1.example.com']
469 }
470 print(payload)
471 r = self.session.post(
472 self.url("/api/v1/servers/localhost/zones"),
473 data=json.dumps(payload),
474 headers={'content-type': 'application/json'})
475 self.assertEqual(r.status_code, 422)
476 self.assertIn('contains unsupported characters', r.json()['error'])
477
478 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self):
479 name = unique_zone_name()
480 rrset = {
481 "name": name,
482 "type": "NS",
483 "ttl": 3600,
484 "records": [{
485 "content": "ns2.example.com.",
486 "disabled": False,
487 }],
488 }
489 payload = {
490 'name': name,
491 'kind': 'Native',
492 'nameservers': ['ns1.example.com.'],
493 'rrsets': [rrset],
494 }
495 print(payload)
496 r = self.session.post(
497 self.url("/api/v1/servers/localhost/zones"),
498 data=json.dumps(payload),
499 headers={'content-type': 'application/json'})
500 self.assertEqual(r.status_code, 422)
501 self.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r.json()['error'])
502
503 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self):
504 name = unique_zone_name()
505 rrset = {
506 "name": 'subzone.'+name,
507 "type": "NS",
508 "ttl": 3600,
509 "records": [{
510 "content": "ns2.example.com.",
511 "disabled": False,
512 }],
513 }
514 payload = {
515 'name': name,
516 'kind': 'Native',
517 'nameservers': ['ns1.example.com.'],
518 'rrsets': [rrset],
519 }
520 print(payload)
521 r = self.session.post(
522 self.url("/api/v1/servers/localhost/zones"),
523 data=json.dumps(payload),
524 headers={'content-type': 'application/json'})
525 self.assert_success_json(r)
526
527 def test_create_zone_with_symbols(self):
528 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
529 name = payload['name']
530 expected_id = name.replace('/', '=2F')
531 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
532 self.assertIn(k, data)
533 if k in payload:
534 self.assertEqual(data[k], payload[k])
535 self.assertEqual(data['id'], expected_id)
536 if not is_auth_lmdb():
537 dbrecs = get_db_records(name, 'SOA')
538 self.assertEqual(dbrecs[0]['name'], name.rstrip('.'))
539
540 def test_create_zone_with_nameservers_non_string(self):
541 # ensure we don't crash
542 name = unique_zone_name()
543 payload = {
544 'name': name,
545 'kind': 'Native',
546 'nameservers': [{'a': 'ns1.example.com'}] # invalid
547 }
548 print(payload)
549 r = self.session.post(
550 self.url("/api/v1/servers/localhost/zones"),
551 data=json.dumps(payload),
552 headers={'content-type': 'application/json'})
553 self.assertEqual(r.status_code, 422)
554
555 def test_create_zone_with_dnssec(self):
556 """
557 Create a zone with "dnssec" set and see if a key was made.
558 """
559 name = unique_zone_name()
560 name, payload, data = self.create_zone(dnssec=True)
561
562 self.get_zone(name)
563
564 for k in ('dnssec', ):
565 self.assertIn(k, data)
566 if k in payload:
567 self.assertEqual(data[k], payload[k])
568
569 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
570
571 keys = r.json()
572
573 print(keys)
574
575 self.assertEqual(r.status_code, 200)
576 self.assertEqual(len(keys), 1)
577 self.assertEqual(keys[0]['type'], 'Cryptokey')
578 self.assertEqual(keys[0]['active'], True)
579 self.assertEqual(keys[0]['keytype'], 'csk')
580
581 def test_create_zone_with_dnssec_disable_dnssec(self):
582 """
583 Create a zone with "dnssec", then set "dnssec" to false and see if the
584 keys are gone
585 """
586 name = unique_zone_name()
587 name, payload, data = self.create_zone(dnssec=True)
588
589 self.put_zone(name, {'dnssec': False})
590
591 zoneinfo = self.get_zone(name)
592 self.assertEqual(zoneinfo['dnssec'], False)
593
594 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
595
596 keys = r.json()
597
598 self.assertEqual(r.status_code, 200)
599 self.assertEqual(len(keys), 0)
600
601 def test_create_zone_with_nsec3param(self):
602 """
603 Create a zone with "nsec3param" set and see if the metadata was added.
604 """
605 name = unique_zone_name()
606 nsec3param = '1 0 100 aabbccddeeff'
607 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
608
609 self.get_zone(name)
610
611 for k in ('dnssec', 'nsec3param'):
612 self.assertIn(k, data)
613 if k in payload:
614 self.assertEqual(data[k], payload[k])
615
616 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3PARAM'))
617
618 data = r.json()
619
620 print(data)
621
622 self.assertEqual(r.status_code, 200)
623 self.assertEqual(len(data['metadata']), 1)
624 self.assertEqual(data['kind'], 'NSEC3PARAM')
625 self.assertEqual(data['metadata'][0], nsec3param)
626
627 def test_create_zone_with_nsec3narrow(self):
628 """
629 Create a zone with "nsec3narrow" set and see if the metadata was added.
630 """
631 name = unique_zone_name()
632 nsec3param = '1 0 100 aabbccddeeff'
633 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
634 nsec3narrow=True)
635
636 self.get_zone(name)
637
638 for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
639 self.assertIn(k, data)
640 if k in payload:
641 self.assertEqual(data[k], payload[k])
642
643 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3NARROW'))
644
645 data = r.json()
646
647 print(data)
648
649 self.assertEqual(r.status_code, 200)
650 self.assertEqual(len(data['metadata']), 1)
651 self.assertEqual(data['kind'], 'NSEC3NARROW')
652 self.assertEqual(data['metadata'][0], '1')
653
654 def test_create_zone_with_nsec3param_switch_to_nsec(self):
655 """
656 Create a zone with "nsec3param", then remove the params
657 """
658 name, payload, data = self.create_zone(dnssec=True,
659 nsec3param='1 0 1 ab')
660 self.put_zone(name, {'nsec3param': ''})
661
662 data = self.get_zone(name)
663 self.assertEqual(data['nsec3param'], '')
664
665 def test_create_zone_without_dnssec_unset_nsec3parm(self):
666 """
667 Create a non dnssec zone and set an empty "nsec3param"
668 """
669 name, payload, data = self.create_zone(dnssec=False)
670 self.put_zone(name, {'nsec3param': ''})
671
672 def test_create_zone_without_dnssec_set_nsec3parm(self):
673 """
674 Create a non dnssec zone and set "nsec3param"
675 """
676 name, payload, data = self.create_zone(dnssec=False)
677 self.put_zone(name, {'nsec3param': '1 0 1 ab'}, expect_error=True)
678
679 def test_create_zone_dnssec_serial(self):
680 """
681 Create a zone, then set and unset "dnssec", then check if the serial was increased
682 after every step
683 """
684 name, payload, data = self.create_zone()
685
686 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
687 self.assertEqual(soa_serial[-2:], '01')
688
689 self.put_zone(name, {'dnssec': True})
690
691 data = self.get_zone(name)
692 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
693 self.assertEqual(soa_serial[-2:], '02')
694
695 self.put_zone(name, {'dnssec': False})
696
697 data = self.get_zone(name)
698 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
699 self.assertEqual(soa_serial[-2:], '03')
700
701 def test_zone_absolute_url(self):
702 self.create_zone()
703 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
704 rdata = r.json()
705 print(rdata[0])
706 self.assertTrue(rdata[0]['url'].startswith('/api/v'))
707
708 def test_create_zone_metadata(self):
709 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
710 r = self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
711 data=json.dumps(payload_metadata))
712 rdata = r.json()
713 self.assertEqual(r.status_code, 201)
714 self.assertEqual(rdata["metadata"], payload_metadata["metadata"])
715
716 def test_create_zone_metadata_kind(self):
717 payload_metadata = {"metadata": ["127.0.0.2"]}
718 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
719 data=json.dumps(payload_metadata))
720 rdata = r.json()
721 self.assertEqual(r.status_code, 200)
722 self.assertEqual(rdata["metadata"], payload_metadata["metadata"])
723
724 def test_create_protected_zone_metadata(self):
725 # test whether it prevents modification of certain kinds
726 for k in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
727 payload = {"metadata": ["FOO", "BAR"]}
728 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k),
729 data=json.dumps(payload))
730 self.assertEqual(r.status_code, 422)
731
732 def test_retrieve_zone_metadata(self):
733 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
734 self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
735 data=json.dumps(payload_metadata))
736 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata"))
737 rdata = r.json()
738 self.assertEqual(r.status_code, 200)
739 self.assertIn(payload_metadata, rdata)
740
741 def test_delete_zone_metadata(self):
742 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
743 self.assertEqual(r.status_code, 204)
744 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
745 rdata = r.json()
746 self.assertEqual(r.status_code, 200)
747 self.assertEqual(rdata["metadata"], [])
748
749 def test_create_external_zone_metadata(self):
750 payload_metadata = {"metadata": ["My very important message"]}
751 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
752 data=json.dumps(payload_metadata))
753 self.assertEqual(r.status_code, 200)
754 rdata = r.json()
755 self.assertEqual(rdata["metadata"], payload_metadata["metadata"])
756
757 def test_create_metadata_in_non_existent_zone(self):
758 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
759 r = self.session.post(self.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
760 data=json.dumps(payload_metadata))
761 self.assertEqual(r.status_code, 404)
762 # Note: errors should probably contain json (see #5988)
763 # self.assertIn('Could not find domain ', r.json()['error'])
764
765 def test_create_slave_zone(self):
766 # Test that nameservers can be absent for slave zones.
767 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
768 for k in ('name', 'masters', 'kind'):
769 self.assertIn(k, data)
770 self.assertEqual(data[k], payload[k])
771 print("payload:", payload)
772 print("data:", data)
773 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
774 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
775 zonelist = r.json()
776 print("zonelist:", zonelist)
777 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
778 # Also test that fetching the zone works.
779 data = self.get_zone(data['id'])
780 print("zone (fetched):", data)
781 for k in ('name', 'masters', 'kind'):
782 self.assertIn(k, data)
783 self.assertEqual(data[k], payload[k])
784 self.assertEqual(data['serial'], 0)
785 self.assertEqual(data['rrsets'], [])
786
787 def test_create_consumer_zone(self):
788 # Test that nameservers can be absent for consumer zones.
789 _, payload, data = self.create_zone(kind='Consumer', nameservers=None, masters=['127.0.0.2'])
790 print("payload:", payload)
791 print("data:", data)
792 # Because consumer zones don't get a SOA, we need to test that they'll show up in the zone list.
793 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
794 zonelist = r.json()
795 print("zonelist:", zonelist)
796 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
797 # Also test that fetching the zone works.
798 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
799 data = r.json()
800 print("zone (fetched):", data)
801 for k in ('name', 'masters', 'kind'):
802 self.assertIn(k, data)
803 self.assertEqual(data[k], payload[k])
804 self.assertEqual(data['serial'], 0)
805 self.assertEqual(data['rrsets'], [])
806
807 def test_create_consumer_zone_no_nameservers(self):
808 """nameservers must be absent for Consumer zones"""
809 self.create_zone(kind="Consumer", nameservers=["127.0.0.1"], expect_error="Nameservers MUST NOT be given for Consumer zones")
810
811 def test_create_consumer_zone_no_rrsets(self):
812 """rrsets must be absent for Consumer zones"""
813 rrsets = [{
814 "name": "$NAME$",
815 "type": "SOA",
816 "ttl": 3600,
817 "records": [{
818 "content": "ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600",
819 "disabled": False,
820 }],
821 }]
822 self.create_zone(kind="Consumer", nameservers=None, rrsets=rrsets, expect_error="Zone data MUST NOT be given for Consumer zones")
823
824 def test_find_zone_by_name(self):
825 name = 'foo/' + unique_zone_name()
826 name, payload, data = self.create_zone(name=name)
827 r = self.session.get(self.url("/api/v1/servers/localhost/zones?zone=" + name))
828 data = r.json()
829 print(data)
830 self.assertEqual(data[0]['name'], name)
831
832 def test_delete_slave_zone(self):
833 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
834 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
835 r.raise_for_status()
836
837 def test_delete_consumer_zone(self):
838 name, payload, data = self.create_zone(kind='Consumer', nameservers=None, masters=['127.0.0.2'])
839 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
840 r.raise_for_status()
841
842 def test_retrieve_slave_zone(self):
843 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
844 print("payload:", payload)
845 print("data:", data)
846 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
847 data = r.json()
848 print("status for axfr-retrieve:", data)
849 self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
850 '\' from primary 127.0.0.2')
851
852 def test_notify_master_zone(self):
853 name, payload, data = self.create_zone(kind='Master')
854 print("payload:", payload)
855 print("data:", data)
856 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
857 data = r.json()
858 print("status for notify:", data)
859 self.assertEqual(data['result'], 'Notification queued')
860
861 def test_get_zone_with_symbols(self):
862 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
863 name = payload['name']
864 zone_id = (name.replace('/', '=2F'))
865 data = self.get_zone(zone_id)
866 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
867 self.assertIn(k, data)
868 if k in payload:
869 self.assertEqual(data[k], payload[k])
870
871 def test_get_zone(self):
872 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
873 domains = r.json()
874 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
875 data = self.get_zone(example_com['id'])
876 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
877 self.assertIn(k, data)
878 self.assertEqual(data['name'], 'example.com.')
879
880 def test_get_zone_rrset(self):
881 name = 'host-18000.example.com.'
882 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
883 domains = r.json()
884 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
885
886 # verify single record from name that has a single record
887 data = self.get_zone(example_com['id'], rrset_name="host-18000.example.com.")
888 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
889 self.assertIn(k, data)
890 self.assertEqual(data['rrsets'],
891 [
892 {
893 'comments': [],
894 'name': name,
895 'records':
896 [
897 {
898 'content': '192.168.1.80',
899 'disabled': False
900 }
901 ],
902 'ttl': 120,
903 'type': 'A'
904 }
905 ]
906 )
907
908 # disable previous record
909 rrset = {
910 'changetype': 'replace',
911 'name': name,
912 'type': 'A',
913 'ttl': 120,
914 'records':
915 [
916 {
917 'content': '192.168.1.80',
918 'disabled': True
919 }
920 ]
921 }
922 payload = {'rrsets': [rrset]}
923 r = self.session.patch(
924 self.url("/api/v1/servers/localhost/zones/example.com"),
925 data=json.dumps(payload),
926 headers={'content-type': 'application/json'})
927 self.assert_success(r)
928
929 # verify that the changed record is not found when asking for
930 # disabled records not to be included
931 data = self.get_zone(example_com['id'], rrset_name="host-18000.example.com.", include_disabled="false")
932 self.assertEqual(len(data['rrsets']), 0)
933
934 # verify that the changed record is found when explicitly asking for
935 # disabled records, and by default.
936 data = self.get_zone(example_com['id'], rrset_name="host-18000.example.com.", include_disabled="true")
937 self.assertEqual(get_rrset(data, name, 'A')['records'], rrset['records'])
938 data = self.get_zone(example_com['id'], rrset_name="host-18000.example.com.")
939 self.assertEqual(get_rrset(data, name, 'A')['records'], rrset['records'])
940
941 # verify two RRsets from a name that has two types with one record each
942 powerdnssec_org = [domain for domain in domains if domain['name'] == u'powerdnssec.org.'][0]
943 data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.")
944 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
945 self.assertIn(k, data)
946 self.assertEqual(sorted(data['rrsets'], key=operator.itemgetter('type')),
947 [
948 {
949 'comments': [],
950 'name': 'localhost.powerdnssec.org.',
951 'records':
952 [
953 {
954 'content': '127.0.0.1',
955 'disabled': False
956 }
957 ],
958 'ttl': 3600,
959 'type': 'A'
960 },
961 {
962 'comments': [],
963 'name': 'localhost.powerdnssec.org.',
964 'records':
965 [
966 {
967 'content': '::1',
968 'disabled': False
969 }
970 ],
971 'ttl': 3600,
972 'type': 'AAAA'
973 },
974 ]
975 )
976
977 # verify one RRset with one record from a name that has two, then filtered by type
978 data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.", rrset_type="AAAA")
979 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
980 self.assertIn(k, data)
981 self.assertEqual(data['rrsets'],
982 [
983 {
984 'comments': [],
985 'name': 'localhost.powerdnssec.org.',
986 'records':
987 [
988 {
989 'content': '::1',
990 'disabled': False
991 }
992 ],
993 'ttl': 3600,
994 'type': 'AAAA'
995 }
996 ]
997 )
998
999 def test_import_zone_broken(self):
1000 payload = {
1001 'name': 'powerdns-broken.com',
1002 'kind': 'Master',
1003 'nameservers': [],
1004 }
1005 payload['zone'] = """
1006;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
1007flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
1008;; WARNING: recursion requested but not available
1009
1010;; OPT PSEUDOSECTION:
1011; EDNS: version: 0, flags:; udp: 1680
1012;; QUESTION SECTION:
1013;powerdns.com. IN SOA
1014
1015;; ANSWER SECTION:
1016powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1017powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
1018powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
1019powerdns-broken.com. 86400 IN A 82.94.213.34
1020powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
1021powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
1022powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1023"""
1024 r = self.session.post(
1025 self.url("/api/v1/servers/localhost/zones"),
1026 data=json.dumps(payload),
1027 headers={'content-type': 'application/json'})
1028 self.assertEqual(r.status_code, 422)
1029
1030 def test_import_zone_axfr_outofzone(self):
1031 # Ensure we don't create out-of-zone records
1032 payload = {
1033 'name': unique_zone_name(),
1034 'kind': 'Master',
1035 'nameservers': [],
1036 }
1037 payload['zone'] = """
1038%NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1039%NAME% 3600 IN NS powerdnssec2.ds9a.nl.
1040example.org. 3600 IN AAAA 2001:888:2000:1d::2
1041%NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1042""".replace('%NAME%', payload['name'])
1043 r = self.session.post(
1044 self.url("/api/v1/servers/localhost/zones"),
1045 data=json.dumps(payload),
1046 headers={'content-type': 'application/json'})
1047 self.assertEqual(r.status_code, 422)
1048 self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
1049
1050 def test_import_zone_axfr(self):
1051 payload = {
1052 'name': 'powerdns.com.',
1053 'kind': 'Master',
1054 'nameservers': [],
1055 'soa_edit_api': '', # turn off so exact SOA comparison works.
1056 }
1057 payload['zone'] = """
1058;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
1059;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
1060;; WARNING: recursion requested but not available
1061
1062;; OPT PSEUDOSECTION:
1063; EDNS: version: 0, flags:; udp: 1680
1064;; QUESTION SECTION:
1065;powerdns.com. IN SOA
1066
1067;; ANSWER SECTION:
1068powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1069powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
1070powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
1071powerdns.com. 86400 IN A 82.94.213.34
1072powerdns.com. 3600 IN MX 0 xs.powerdns.com.
1073powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
1074powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1075"""
1076 r = self.session.post(
1077 self.url("/api/v1/servers/localhost/zones"),
1078 data=json.dumps(payload),
1079 headers={'content-type': 'application/json'})
1080 self.assert_success_json(r)
1081 data = r.json()
1082 self.assertIn('name', data)
1083
1084 expected = {
1085 'NS': [
1086 {'content': 'powerdnssec1.ds9a.nl.'},
1087 {'content': 'powerdnssec2.ds9a.nl.'},
1088 ],
1089 'SOA': [
1090 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
1091 ],
1092 'MX': [
1093 {'content': '0 xs.powerdns.com.'},
1094 ],
1095 'A': [
1096 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
1097 ],
1098 'AAAA': [
1099 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
1100 ],
1101 }
1102
1103 eq_zone_rrsets(data['rrsets'], expected)
1104
1105 if not is_auth_lmdb():
1106 # check content in DB is stored WITHOUT trailing dot.
1107 dbrecs = get_db_records(payload['name'], 'NS')
1108 dbrec = next((dbrec for dbrec in dbrecs if dbrec['content'].startswith('powerdnssec1')))
1109 self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
1110
1111 def test_import_zone_bind(self):
1112 payload = {
1113 'name': 'example.org.',
1114 'kind': 'Master',
1115 'nameservers': [],
1116 'soa_edit_api': '', # turn off so exact SOA comparison works.
1117 }
1118 payload['zone'] = """
1119$TTL 86400 ; 24 hours could have been written as 24h or 1d
1120; $TTL used for all RRs without explicit TTL value
1121$ORIGIN example.org.
1122@ 1D IN SOA ns1.example.org. hostmaster.example.org. (
1123 2002022401 ; serial
1124 3H ; refresh
1125 15 ; retry
1126 1w ; expire
1127 3h ; minimum
1128 )
1129 IN NS ns1.example.org. ; in the domain
1130 IN NS ns2.smokeyjoe.com. ; external to domain
1131 IN MX 10 mail.another.com. ; external mail provider
1132; server host definitions
1133ns1 IN A 192.168.0.1 ;name server definition
1134www IN A 192.168.0.2 ;web server definition
1135ftp IN CNAME www.example.org. ;ftp server definition
1136; non server domain hosts
1137bill IN A 192.168.0.3
1138fred IN A 192.168.0.4
1139"""
1140 r = self.session.post(
1141 self.url("/api/v1/servers/localhost/zones"),
1142 data=json.dumps(payload),
1143 headers={'content-type': 'application/json'})
1144 self.assert_success_json(r)
1145 data = r.json()
1146 self.assertIn('name', data)
1147
1148 expected = {
1149 'NS': [
1150 {'content': 'ns1.example.org.'},
1151 {'content': 'ns2.smokeyjoe.com.'},
1152 ],
1153 'SOA': [
1154 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
1155 ],
1156 'MX': [
1157 {'content': '10 mail.another.com.'},
1158 ],
1159 'A': [
1160 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
1161 {'content': '192.168.0.2', 'name': 'www.example.org.'},
1162 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
1163 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
1164 ],
1165 'CNAME': [
1166 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
1167 ],
1168 }
1169
1170 eq_zone_rrsets(data['rrsets'], expected)
1171
1172 def test_import_zone_bind_cname_apex(self):
1173 payload = {
1174 'name': unique_zone_name(),
1175 'kind': 'Master',
1176 'nameservers': [],
1177 }
1178 payload['zone'] = """
1179$ORIGIN %NAME%
1180@ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
1181@ IN NS ns1.example.org.
1182@ IN NS ns2.smokeyjoe.com.
1183@ IN CNAME www.example.org.
1184""".replace('%NAME%', payload['name'])
1185 r = self.session.post(
1186 self.url("/api/v1/servers/localhost/zones"),
1187 data=json.dumps(payload),
1188 headers={'content-type': 'application/json'})
1189 self.assertEqual(r.status_code, 422)
1190 self.assertIn('Conflicts with another RRset', r.json()['error'])
1191
1192 def test_export_zone_json(self):
1193 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
1194 # export it
1195 r = self.session.get(
1196 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
1197 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
1198 )
1199 self.assert_success_json(r)
1200 data = r.json()
1201 self.assertIn('zone', data)
1202 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
1203 name + '\t3600\tIN\tNS\tns2.foo.com.',
1204 name + '\t3600\tIN\tSOA\ta.misconfigured.dns.server.invalid. hostmaster.' + name +
1205 ' 0 10800 3600 604800 3600']
1206 self.assertCountEqual(data['zone'].strip().split('\n'), expected_data)
1207
1208 def test_import_zone_consumer(self):
1209 zonestring = """
1210$NAME$ 1D IN SOA ns1.example.org. hostmaster.example.org. (
1211 2002022401 ; serial
1212 3H ; refresh
1213 15 ; retry
1214 1w ; expire
1215 3h ; minimum
1216 )
1217 """
1218 self.create_zone(kind="Consumer", nameservers=[], zone=zonestring, expect_error="Zone data MUST NOT be given for Consumer zones")
1219
1220 def test_export_zone_text(self):
1221 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
1222 # export it
1223 r = self.session.get(
1224 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
1225 headers={'accept': '*/*'}
1226 )
1227 data = r.text.strip().split("\n")
1228 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
1229 name + '\t3600\tIN\tNS\tns2.foo.com.',
1230 name + '\t3600\tIN\tSOA\ta.misconfigured.dns.server.invalid. hostmaster.' + name +
1231 ' 0 10800 3600 604800 3600']
1232 self.assertCountEqual(data, expected_data)
1233
1234 def test_update_zone(self):
1235 name, payload, zone = self.create_zone()
1236 name = payload['name']
1237 # update, set as Master and enable SOA-EDIT-API
1238 payload = {
1239 'kind': 'Master',
1240 'masters': ['192.0.2.1', '192.0.2.2'],
1241 'catalog': 'catalog.invalid.',
1242 'soa_edit_api': 'EPOCH',
1243 'soa_edit': 'EPOCH'
1244 }
1245 self.put_zone(name, payload)
1246 data = self.get_zone(name)
1247 for k in payload.keys():
1248 self.assertIn(k, data)
1249 self.assertEqual(data[k], payload[k])
1250 # update, back to Native and empty(off)
1251 payload = {
1252 'kind': 'Native',
1253 'catalog': '',
1254 'soa_edit_api': '',
1255 'soa_edit': ''
1256 }
1257 self.put_zone(name, payload)
1258 data = self.get_zone(name)
1259 for k in payload.keys():
1260 self.assertIn(k, data)
1261 self.assertEqual(data[k], payload[k])
1262
1263 def test_zone_rr_update(self):
1264 name, payload, zone = self.create_zone()
1265 # do a replace (= update)
1266 rrset = {
1267 'changetype': 'replace',
1268 'name': name,
1269 'type': 'ns',
1270 'ttl': 3600,
1271 'records': [
1272 {
1273 "content": "ns1.bar.com.",
1274 "disabled": False
1275 },
1276 {
1277 "content": "ns2-disabled.bar.com.",
1278 "disabled": True
1279 }
1280 ]
1281 }
1282 payload = {'rrsets': [rrset]}
1283 r = self.session.patch(
1284 self.url("/api/v1/servers/localhost/zones/" + name),
1285 data=json.dumps(payload),
1286 headers={'content-type': 'application/json'})
1287 self.assert_success(r)
1288 # verify that (only) the new record is there
1289 data = self.get_zone(name)
1290 self.assertCountEqual(get_rrset(data, name, 'NS')['records'], rrset['records'])
1291
1292 def test_zone_rr_update_mx(self):
1293 # Important to test with MX records, as they have a priority field, which must end up in the content field.
1294 name, payload, zone = self.create_zone()
1295 # do a replace (= update)
1296 rrset = {
1297 'changetype': 'replace',
1298 'name': name,
1299 'type': 'MX',
1300 'ttl': 3600,
1301 'records': [
1302 {
1303 "content": "10 mail.example.org.",
1304 "disabled": False
1305 }
1306 ]
1307 }
1308 payload = {'rrsets': [rrset]}
1309 r = self.session.patch(
1310 self.url("/api/v1/servers/localhost/zones/" + name),
1311 data=json.dumps(payload),
1312 headers={'content-type': 'application/json'})
1313 self.assert_success(r)
1314 # verify that (only) the new record is there
1315 data = self.get_zone(name)
1316 self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset['records'])
1317
1318 def test_zone_rr_update_invalid_mx(self):
1319 name, payload, zone = self.create_zone()
1320 # do a replace (= update)
1321 rrset = {
1322 'changetype': 'replace',
1323 'name': name,
1324 'type': 'MX',
1325 'ttl': 3600,
1326 'records': [
1327 {
1328 "content": "10 mail@mx.example.org.",
1329 "disabled": False
1330 }
1331 ]
1332 }
1333 payload = {'rrsets': [rrset]}
1334 r = self.session.patch(
1335 self.url("/api/v1/servers/localhost/zones/" + name),
1336 data=json.dumps(payload),
1337 headers={'content-type': 'application/json'})
1338 self.assertEqual(r.status_code, 422)
1339 self.assertIn('non-hostname content', r.json()['error'])
1340 data = self.get_zone(name)
1341 self.assertIsNone(get_rrset(data, name, 'MX'))
1342
1343 def test_zone_rr_update_opt(self):
1344 name, payload, zone = self.create_zone()
1345 # do a replace (= update)
1346 rrset = {
1347 'changetype': 'replace',
1348 'name': name,
1349 'type': 'OPT',
1350 'ttl': 3600,
1351 'records': [
1352 {
1353 "content": "9",
1354 "disabled": False
1355 }
1356 ]
1357 }
1358 payload = {'rrsets': [rrset]}
1359 r = self.session.patch(
1360 self.url("/api/v1/servers/localhost/zones/" + name),
1361 data=json.dumps(payload),
1362 headers={'content-type': 'application/json'})
1363 self.assertEqual(r.status_code, 422)
1364 self.assertIn('OPT: invalid type given', r.json()['error'])
1365
1366 def test_zone_rr_update_multiple_rrsets(self):
1367 name, payload, zone = self.create_zone()
1368 rrset1 = {
1369 'changetype': 'replace',
1370 'name': name,
1371 'type': 'NS',
1372 'ttl': 3600,
1373 'records': [
1374 {
1375
1376 "content": "ns9999.example.com.",
1377 "disabled": False
1378 }
1379 ]
1380 }
1381 rrset2 = {
1382 'changetype': 'replace',
1383 'name': name,
1384 'type': 'MX',
1385 'ttl': 3600,
1386 'records': [
1387 {
1388 "content": "10 mx444.example.com.",
1389 "disabled": False
1390 }
1391 ]
1392 }
1393 payload = {'rrsets': [rrset1, rrset2]}
1394 r = self.session.patch(
1395 self.url("/api/v1/servers/localhost/zones/" + name),
1396 data=json.dumps(payload),
1397 headers={'content-type': 'application/json'})
1398 self.assert_success(r)
1399 # verify that all rrsets have been updated
1400 data = self.get_zone(name)
1401 self.assertEqual(get_rrset(data, name, 'NS')['records'], rrset1['records'])
1402 self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset2['records'])
1403
1404 def test_zone_rr_update_duplicate_record(self):
1405 name, payload, zone = self.create_zone()
1406 rrset = {
1407 'changetype': 'replace',
1408 'name': name,
1409 'type': 'NS',
1410 'ttl': 3600,
1411 'records': [
1412 {"content": "ns9999.example.com.", "disabled": False},
1413 {"content": "ns9996.example.com.", "disabled": False},
1414 {"content": "ns9987.example.com.", "disabled": False},
1415 {"content": "ns9988.example.com.", "disabled": False},
1416 {"content": "ns9999.example.com.", "disabled": False},
1417 ]
1418 }
1419 payload = {'rrsets': [rrset]}
1420 r = self.session.patch(
1421 self.url("/api/v1/servers/localhost/zones/" + name),
1422 data=json.dumps(payload),
1423 headers={'content-type': 'application/json'})
1424 self.assertEqual(r.status_code, 422)
1425 self.assertIn('Duplicate record in RRset', r.json()['error'])
1426
1427 def test_zone_rr_update_duplicate_rrset(self):
1428 name, payload, zone = self.create_zone()
1429 rrset1 = {
1430 'changetype': 'replace',
1431 'name': name,
1432 'type': 'NS',
1433 'ttl': 3600,
1434 'records': [
1435 {
1436 "content": "ns9999.example.com.",
1437 "disabled": False
1438 }
1439 ]
1440 }
1441 rrset2 = {
1442 'changetype': 'replace',
1443 'name': name,
1444 'type': 'NS',
1445 'ttl': 3600,
1446 'records': [
1447 {
1448 "content": "ns9998.example.com.",
1449 "disabled": False
1450 }
1451 ]
1452 }
1453 payload = {'rrsets': [rrset1, rrset2]}
1454 r = self.session.patch(
1455 self.url("/api/v1/servers/localhost/zones/" + name),
1456 data=json.dumps(payload),
1457 headers={'content-type': 'application/json'})
1458 self.assertEqual(r.status_code, 422)
1459 self.assertIn('Duplicate RRset', r.json()['error'])
1460
1461 def test_zone_rr_delete(self):
1462 name, payload, zone = self.create_zone()
1463 # do a delete of all NS records (these are created with the zone)
1464 rrset = {
1465 'changetype': 'delete',
1466 'name': name,
1467 'type': 'NS'
1468 }
1469 payload = {'rrsets': [rrset]}
1470 r = self.session.patch(
1471 self.url("/api/v1/servers/localhost/zones/" + name),
1472 data=json.dumps(payload),
1473 headers={'content-type': 'application/json'})
1474 self.assert_success(r)
1475 # verify that the records are gone
1476 data = self.get_zone(name)
1477 self.assertIsNone(get_rrset(data, name, 'NS'))
1478
1479 def test_zone_rr_update_rrset_combine_replace_and_delete(self):
1480 name, payload, zone = self.create_zone()
1481 rrset1 = {
1482 'changetype': 'delete',
1483 'name': 'sub.' + name,
1484 'type': 'CNAME',
1485 }
1486 rrset2 = {
1487 'changetype': 'replace',
1488 'name': 'sub.' + name,
1489 'type': 'CNAME',
1490 'ttl': 500,
1491 'records': [
1492 {
1493 "content": "www.example.org.",
1494 "disabled": False
1495 }
1496 ]
1497 }
1498 payload = {'rrsets': [rrset1, rrset2]}
1499 r = self.session.patch(
1500 self.url("/api/v1/servers/localhost/zones/" + name),
1501 data=json.dumps(payload),
1502 headers={'content-type': 'application/json'})
1503 self.assert_success(r)
1504 # verify that (only) the new record is there
1505 data = self.get_zone(name)
1506 self.assertEqual(get_rrset(data, 'sub.' + name, 'CNAME')['records'], rrset2['records'])
1507
1508 def test_zone_disable_reenable(self):
1509 # This also tests that SOA-EDIT-API works.
1510 name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
1511 # disable zone by disabling SOA
1512 rrset = {
1513 'changetype': 'replace',
1514 'name': name,
1515 'type': 'SOA',
1516 'ttl': 3600,
1517 'records': [
1518 {
1519 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1520 "disabled": True
1521 }
1522 ]
1523 }
1524 payload = {'rrsets': [rrset]}
1525 r = self.session.patch(
1526 self.url("/api/v1/servers/localhost/zones/" + name),
1527 data=json.dumps(payload),
1528 headers={'content-type': 'application/json'})
1529 self.assert_success(r)
1530 # check SOA serial has been edited
1531 data = self.get_zone(name)
1532 soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1533 self.assertNotEqual(soa_serial1, '1')
1534 # make sure domain is still in zone list (disabled SOA!)
1535 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
1536 domains = r.json()
1537 self.assertEqual(len([domain for domain in domains if domain['name'] == name]), 1)
1538 # sleep 1sec to ensure the EPOCH value changes for the next request
1539 time.sleep(1)
1540 # verify that modifying it still works
1541 rrset['records'][0]['disabled'] = False
1542 payload = {'rrsets': [rrset]}
1543 r = self.session.patch(
1544 self.url("/api/v1/servers/localhost/zones/" + name),
1545 data=json.dumps(payload),
1546 headers={'content-type': 'application/json'})
1547 self.assert_success(r)
1548 # check SOA serial has been edited again
1549 data = self.get_zone(name)
1550 soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1551 self.assertNotEqual(soa_serial2, '1')
1552 self.assertNotEqual(soa_serial2, soa_serial1)
1553
1554 def test_zone_rr_update_out_of_zone(self):
1555 name, payload, zone = self.create_zone()
1556 # replace with qname mismatch
1557 rrset = {
1558 'changetype': 'replace',
1559 'name': 'not-in-zone.',
1560 'type': 'NS',
1561 'ttl': 3600,
1562 'records': [
1563 {
1564 "content": "ns1.bar.com.",
1565 "disabled": False
1566 }
1567 ]
1568 }
1569 payload = {'rrsets': [rrset]}
1570 r = self.session.patch(
1571 self.url("/api/v1/servers/localhost/zones/" + name),
1572 data=json.dumps(payload),
1573 headers={'content-type': 'application/json'})
1574 self.assertEqual(r.status_code, 422)
1575 self.assertIn('out of zone', r.json()['error'])
1576
1577 def test_zone_rr_update_restricted_chars(self):
1578 name, payload, zone = self.create_zone()
1579 # replace with qname mismatch
1580 rrset = {
1581 'changetype': 'replace',
1582 'name': 'test:' + name,
1583 'type': 'NS',
1584 'ttl': 3600,
1585 'records': [
1586 {
1587 "content": "ns1.bar.com.",
1588 "disabled": False
1589 }
1590 ]
1591 }
1592 payload = {'rrsets': [rrset]}
1593 r = self.session.patch(
1594 self.url("/api/v1/servers/localhost/zones/" + name),
1595 data=json.dumps(payload),
1596 headers={'content-type': 'application/json'})
1597 self.assertEqual(r.status_code, 422)
1598 self.assertIn('contains unsupported characters', r.json()['error'])
1599
1600 def test_rrset_unknown_type(self):
1601 name, payload, zone = self.create_zone()
1602 rrset = {
1603 'changetype': 'replace',
1604 'name': name,
1605 'type': 'FAFAFA',
1606 'ttl': 3600,
1607 'records': [
1608 {
1609 "content": "4.3.2.1",
1610 "disabled": False
1611 }
1612 ]
1613 }
1614 payload = {'rrsets': [rrset]}
1615 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1616 headers={'content-type': 'application/json'})
1617 self.assertEqual(r.status_code, 422)
1618 self.assertIn('unknown type', r.json()['error'])
1619
1620 @parameterized.expand([
1621 ('CNAME', ),
1622 ])
1623 def test_rrset_exclusive_and_other(self, qtype):
1624 name, payload, zone = self.create_zone()
1625 rrset = {
1626 'changetype': 'replace',
1627 'name': name,
1628 'type': qtype,
1629 'ttl': 3600,
1630 'records': [
1631 {
1632 "content": "example.org.",
1633 "disabled": False
1634 }
1635 ]
1636 }
1637 payload = {'rrsets': [rrset]}
1638 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1639 headers={'content-type': 'application/json'})
1640 self.assertEqual(r.status_code, 422)
1641 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1642
1643 @parameterized.expand([
1644 ('CNAME', ),
1645 ])
1646 def test_rrset_other_and_exclusive(self, qtype):
1647 name, payload, zone = self.create_zone()
1648 rrset = {
1649 'changetype': 'replace',
1650 'name': 'sub.'+name,
1651 'type': qtype,
1652 'ttl': 3600,
1653 'records': [
1654 {
1655 "content": "example.org.",
1656 "disabled": False
1657 }
1658 ]
1659 }
1660 payload = {'rrsets': [rrset]}
1661 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1662 headers={'content-type': 'application/json'})
1663 self.assert_success(r)
1664 rrset = {
1665 'changetype': 'replace',
1666 'name': 'sub.'+name,
1667 'type': 'A',
1668 'ttl': 3600,
1669 'records': [
1670 {
1671 "content": "1.2.3.4",
1672 "disabled": False
1673 }
1674 ]
1675 }
1676 payload = {'rrsets': [rrset]}
1677 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1678 headers={'content-type': 'application/json'})
1679 self.assertEqual(r.status_code, 422)
1680 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1681
1682 @parameterized.expand([
1683 ('', 'SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
1684 ('sub.', 'CNAME', ['01.example.org.', '02.example.org.']),
1685 ])
1686 def test_rrset_single_qtypes(self, label, qtype, contents):
1687 name, payload, zone = self.create_zone()
1688 rrset = {
1689 'changetype': 'replace',
1690 'name': label + name,
1691 'type': qtype,
1692 'ttl': 3600,
1693 'records': [
1694 {
1695 "content": contents[0],
1696 "disabled": False
1697 },
1698 {
1699 "content": contents[1],
1700 "disabled": False
1701 }
1702 ]
1703 }
1704 payload = {'rrsets': [rrset]}
1705 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1706 headers={'content-type': 'application/json'})
1707 self.assertEqual(r.status_code, 422)
1708 self.assertIn('IN ' + qtype + ' has more than one record', r.json()['error'])
1709
1710 def test_rrset_zone_apex(self):
1711 name, payload, zone = self.create_zone()
1712 rrset1 = {
1713 'changetype': 'replace',
1714 'name': name,
1715 'type': 'SOA',
1716 'ttl': 3600,
1717 'records': [
1718 {
1719 "content": 'ns1.example.org. test@example.org. 10 10800 3600 604800 3600',
1720 "disabled": False
1721 },
1722 ]
1723 }
1724 rrset2 = {
1725 'changetype': 'replace',
1726 'name': name,
1727 'type': 'DNAME',
1728 'ttl': 3600,
1729 'records': [
1730 {
1731 "content": 'example.com.',
1732 "disabled": False
1733 },
1734 ]
1735 }
1736
1737 payload = {'rrsets': [rrset1, rrset2]}
1738 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1739 headers={'content-type': 'application/json'})
1740 self.assert_success(r) # user should be able to create DNAME at APEX as per RFC 6672 section 2.3
1741
1742 @parameterized.expand([
1743 ('SOA', 'ns1.example.org. test@example.org. 10 10800 3600 604800 1800'),
1744 ('DNSKEY', '257 3 8 AwEAAb/+pXOZWYQ8mv9WM5dFva8WU9jcIUdDuEjldbyfnkQ/xlrJC5zAEfhYhrea3SmIPmMTDimLqbh3/4SMTNPTUF+9+U1vpNfIRTFadqsmuU9Fddz3JqCcYwEpWbReg6DJOeyu+9oBoIQkPxFyLtIXEPGlQzrynKubn04Cx83I6NfzDTraJT3jLHKeW5PVc1ifqKzHz5TXdHHTA7NkJAa0sPcZCoNE1LpnJI/wcUpRUiuQhoLFeT1E432GuPuZ7y+agElGj0NnBxEgnHrhrnZWUbULpRa/il+Cr5Taj988HqX9Xdm6FjcP4Lbuds/44U7U8du224Q8jTrZ57Yvj4VDQKc='),
1745 ])
1746 def test_only_at_apex(self, qtype, content):
1747 name, payload, zone = self.create_zone(soa_edit_api='')
1748 rrset = {
1749 'changetype': 'replace',
1750 'name': name,
1751 'type': qtype,
1752 'ttl': 3600,
1753 'records': [
1754 {
1755 "content": content,
1756 "disabled": False
1757 },
1758 ]
1759 }
1760 payload = {'rrsets': [rrset]}
1761 r = self.session.patch(
1762 self.url("/api/v1/servers/localhost/zones/" + name),
1763 data=json.dumps(payload),
1764 headers={'content-type': 'application/json'})
1765 self.assert_success(r)
1766 # verify that the new record is there
1767 data = self.get_zone(name)
1768 self.assertEqual(get_rrset(data, name, qtype)['records'], rrset['records'])
1769
1770 rrset = {
1771 'changetype': 'replace',
1772 'name': 'sub.' + name,
1773 'type': qtype,
1774 'ttl': 3600,
1775 'records': [
1776 {
1777 "content": content,
1778 "disabled": False
1779 },
1780 ]
1781 }
1782 payload = {'rrsets': [rrset]}
1783 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1784 headers={'content-type': 'application/json'})
1785 self.assertEqual(r.status_code, 422)
1786 self.assertIn('only allowed at apex', r.json()['error'])
1787 data = self.get_zone(name)
1788 self.assertIsNone(get_rrset(data, 'sub.' + name, qtype))
1789
1790 @parameterized.expand([
1791 ('DS', '44030 8 2 d4c3d5552b8679faeebc317e5f048b614b2e5f607dc57f1553182d49ab2179f7'),
1792 ])
1793 def test_not_allowed_at_apex(self, qtype, content):
1794 name, payload, zone = self.create_zone()
1795 rrset = {
1796 'changetype': 'replace',
1797 'name': name,
1798 'type': qtype,
1799 'ttl': 3600,
1800 'records': [
1801 {
1802 "content": content,
1803 "disabled": False
1804 },
1805 ]
1806 }
1807 payload = {'rrsets': [rrset]}
1808 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1809 headers={'content-type': 'application/json'})
1810 self.assertEqual(r.status_code, 422)
1811 self.assertIn('not allowed at apex', r.json()['error'])
1812 data = self.get_zone(name)
1813 self.assertIsNone(get_rrset(data, 'sub.' + name, qtype))
1814
1815 rrset = {
1816 'changetype': 'replace',
1817 'name': 'sub.' + name,
1818 'type': qtype,
1819 'ttl': 3600,
1820 'records': [
1821 {
1822 "content": content,
1823 "disabled": False
1824 },
1825 ]
1826 }
1827 payload = {'rrsets': [rrset]}
1828 r = self.session.patch(
1829 self.url("/api/v1/servers/localhost/zones/" + name),
1830 data=json.dumps(payload),
1831 headers={'content-type': 'application/json'})
1832 self.assert_success(r)
1833 # verify that the new record is there
1834 data = self.get_zone(name)
1835 self.assertEqual(get_rrset(data, 'sub.' + name, qtype)['records'], rrset['records'])
1836
1837 def test_rr_svcb(self):
1838 name, payload, zone = self.create_zone()
1839 rrset = {
1840 'changetype': 'replace',
1841 'name': 'svcb.' + name,
1842 'type': 'SVCB',
1843 'ttl': 3600,
1844 'records': [
1845 {
1846 "content": '40 . mandatory=alpn alpn=h2,h3 ipv4hint=192.0.2.1,192.0.2.2 ech="dG90YWxseSBib2d1cyBlY2hjb25maWcgdmFsdWU="',
1847 "disabled": False
1848 },
1849 ]
1850 }
1851 payload = {'rrsets': [rrset]}
1852 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1853 headers={'content-type': 'application/json'})
1854 self.assert_success(r)
1855
1856 def test_rrset_ns_dname_exclude(self):
1857 name, payload, zone = self.create_zone()
1858 rrset = {
1859 'changetype': 'replace',
1860 'name': 'delegation.'+name,
1861 'type': 'NS',
1862 'ttl': 3600,
1863 'records': [
1864 {
1865 "content": "ns.example.org.",
1866 "disabled": False
1867 }
1868 ]
1869 }
1870 payload = {'rrsets': [rrset]}
1871 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1872 headers={'content-type': 'application/json'})
1873 self.assert_success(r)
1874 rrset = {
1875 'changetype': 'replace',
1876 'name': 'delegation.'+name,
1877 'type': 'DNAME',
1878 'ttl': 3600,
1879 'records': [
1880 {
1881 "content": "example.com.",
1882 "disabled": False
1883 }
1884 ]
1885 }
1886 payload = {'rrsets': [rrset]}
1887 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1888 headers={'content-type': 'application/json'})
1889 self.assertEqual(r.status_code, 422)
1890 self.assertIn('Cannot have both NS and DNAME except in zone apex', r.json()['error'])
1891
1892## FIXME: Enable this when it's time for it
1893# def test_rrset_dname_nothing_under(self):
1894# name, payload, zone = self.create_zone()
1895# rrset = {
1896# 'changetype': 'replace',
1897# 'name': 'delegation.'+name,
1898# 'type': 'DNAME',
1899# 'ttl': 3600,
1900# 'records': [
1901# {
1902# "content": "example.com.",
1903# "disabled": False
1904# }
1905# ]
1906# }
1907# payload = {'rrsets': [rrset]}
1908# r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1909# headers={'content-type': 'application/json'})
1910# self.assert_success(r)
1911# rrset = {
1912# 'changetype': 'replace',
1913# 'name': 'sub.delegation.'+name,
1914# 'type': 'A',
1915# 'ttl': 3600,
1916# 'records': [
1917# {
1918# "content": "1.2.3.4",
1919# "disabled": False
1920# }
1921# ]
1922# }
1923# payload = {'rrsets': [rrset]}
1924# r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1925# headers={'content-type': 'application/json'})
1926# self.assertEqual(r.status_code, 422)
1927# self.assertIn('You cannot have record(s) under CNAME/DNAME', r.json()['error'])
1928
1929 def test_create_zone_with_leading_space(self):
1930 # Actual regression.
1931 name, payload, zone = self.create_zone()
1932 rrset = {
1933 'changetype': 'replace',
1934 'name': name,
1935 'type': 'A',
1936 'ttl': 3600,
1937 'records': [
1938 {
1939 "content": " 4.3.2.1",
1940 "disabled": False
1941 }
1942 ]
1943 }
1944 payload = {'rrsets': [rrset]}
1945 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1946 headers={'content-type': 'application/json'})
1947 self.assertEqual(r.status_code, 422)
1948 self.assertIn('Not in expected format', r.json()['error'])
1949
1950 @unittest.skipIf(is_auth_lmdb(), "No out-of-zone storage in LMDB")
1951 def test_zone_rr_delete_out_of_zone(self):
1952 name, payload, zone = self.create_zone()
1953 rrset = {
1954 'changetype': 'delete',
1955 'name': 'not-in-zone.',
1956 'type': 'NS'
1957 }
1958 payload = {'rrsets': [rrset]}
1959 r = self.session.patch(
1960 self.url("/api/v1/servers/localhost/zones/" + name),
1961 data=json.dumps(payload),
1962 headers={'content-type': 'application/json'})
1963 print(r.content)
1964 self.assert_success(r) # succeed so users can fix their wrong, old data
1965
1966 def test_zone_delete(self):
1967 name, payload, zone = self.create_zone()
1968 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1969 self.assertEqual(r.status_code, 204)
1970 self.assertNotIn('Content-Type', r.headers)
1971
1972 def test_zone_comment_create(self):
1973 name, payload, zone = self.create_zone()
1974 rrset1 = {
1975 'changetype': 'replace',
1976 'name': name,
1977 'type': 'NS',
1978 'ttl': 3600,
1979 'comments': [
1980 {
1981 'account': 'test1',
1982 'content': 'blah blah',
1983 },
1984 {
1985 'account': 'test2',
1986 'content': 'blah blah bleh',
1987 }
1988 ]
1989 }
1990 rrset2 = {
1991 'changetype': 'replace',
1992 'name': name,
1993 'type': 'SOA',
1994 'ttl': 3600,
1995 'comments': [
1996 {
1997 'account': 'test3',
1998 'content': 'this should not show up later'
1999 }
2000 ]
2001 }
2002 payload = {'rrsets': [rrset1, rrset2]}
2003 r = self.session.patch(
2004 self.url("/api/v1/servers/localhost/zones/" + name),
2005 data=json.dumps(payload),
2006 headers={'content-type': 'application/json'})
2007 if is_auth_lmdb():
2008 self.assert_error_json(r) # No comments in LMDB
2009 return
2010 else:
2011 self.assert_success(r)
2012 # make sure the comments have been set, and that the NS
2013 # records are still present
2014 data = self.get_zone(name, rrset_name=name, rrset_type="NS")
2015 serverset = get_rrset(data, name, 'NS')
2016 print(serverset)
2017 self.assertNotEqual(serverset['records'], [])
2018 self.assertNotEqual(serverset['comments'], [])
2019 # verify that modified_at has been set by pdns
2020 self.assertNotEqual([c for c in serverset['comments']][0]['modified_at'], 0)
2021 # verify that unrelated comments do not leak into the result
2022 self.assertEqual(get_rrset(data, name, 'SOA'), None)
2023 # verify that TTL is correct (regression test)
2024 self.assertEqual(serverset['ttl'], 3600)
2025
2026 def test_zone_comment_delete(self):
2027 # Test: Delete ONLY comments.
2028 name, payload, zone = self.create_zone()
2029 rrset = {
2030 'changetype': 'replace',
2031 'name': name,
2032 'type': 'NS',
2033 'comments': []
2034 }
2035 payload = {'rrsets': [rrset]}
2036 r = self.session.patch(
2037 self.url("/api/v1/servers/localhost/zones/" + name),
2038 data=json.dumps(payload),
2039 headers={'content-type': 'application/json'})
2040 self.assert_success(r)
2041 # make sure the NS records are still present
2042 data = self.get_zone(name)
2043 serverset = get_rrset(data, name, 'NS')
2044 print(serverset)
2045 self.assertNotEqual(serverset['records'], [])
2046 self.assertEqual(serverset['comments'], [])
2047
2048 @unittest.skipIf(is_auth_lmdb(), "No comments in LMDB")
2049 def test_zone_comment_out_of_range_modified_at(self):
2050 # Test if a modified_at outside of the 32 bit range throws an error
2051 name, payload, zone = self.create_zone()
2052 rrset = {
2053 'changetype': 'replace',
2054 'name': name,
2055 'type': 'NS',
2056 'comments': [
2057 {
2058 'account': 'test1',
2059 'content': 'oh hi there',
2060 'modified_at': '4294967297'
2061 }
2062 ]
2063 }
2064 payload = {'rrsets': [rrset]}
2065 r = self.session.patch(
2066 self.url("/api/v1/servers/localhost/zones/" + name),
2067 data=json.dumps(payload),
2068 headers={'content-type': 'application/json'})
2069 self.assertEqual(r.status_code, 422)
2070 self.assertIn("Key 'modified_at' is out of range", r.json()['error'])
2071
2072 @unittest.skipIf(is_auth_lmdb(), "No comments in LMDB")
2073 def test_zone_comment_stay_intact(self):
2074 # Test if comments on an rrset stay intact if the rrset is replaced
2075 name, payload, zone = self.create_zone()
2076 # create a comment
2077 rrset = {
2078 'changetype': 'replace',
2079 'name': name,
2080 'type': 'NS',
2081 'comments': [
2082 {
2083 'account': 'test1',
2084 'content': 'oh hi there',
2085 'modified_at': 1111
2086 }
2087 ]
2088 }
2089 payload = {'rrsets': [rrset]}
2090 r = self.session.patch(
2091 self.url("/api/v1/servers/localhost/zones/" + name),
2092 data=json.dumps(payload),
2093 headers={'content-type': 'application/json'})
2094 self.assert_success(r)
2095 # replace rrset records
2096 rrset2 = {
2097 'changetype': 'replace',
2098 'name': name,
2099 'type': 'NS',
2100 'ttl': 3600,
2101 'records': [
2102 {
2103 "content": "ns1.bar.com.",
2104 "disabled": False
2105 }
2106 ]
2107 }
2108 payload2 = {'rrsets': [rrset2]}
2109 r = self.session.patch(
2110 self.url("/api/v1/servers/localhost/zones/" + name),
2111 data=json.dumps(payload2),
2112 headers={'content-type': 'application/json'})
2113 self.assert_success(r)
2114 # make sure the comments still exist
2115 data = self.get_zone(name)
2116 serverset = get_rrset(data, name, 'NS')
2117 print(serverset)
2118 self.assertEqual(serverset['records'], rrset2['records'])
2119 self.assertEqual(serverset['comments'], rrset['comments'])
2120
2121 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2122 def test_search_rr_exact_zone(self):
2123 name = unique_zone_name()
2124 self.create_zone(name=name, serial=22, soa_edit_api='')
2125 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
2126 self.assert_success_json(r)
2127 print(r.json())
2128 self.assertCountEqual(r.json(), [
2129 {u'object_type': u'zone', u'name': name, u'zone_id': name},
2130 {u'content': u'ns1.example.com.',
2131 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2132 u'ttl': 3600, u'type': u'NS', u'name': name},
2133 {u'content': u'ns2.example.com.',
2134 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2135 u'ttl': 3600, u'type': u'NS', u'name': name},
2136 {u'content': u'a.misconfigured.dns.server.invalid. hostmaster.'+name+' 22 10800 3600 604800 3600',
2137 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2138 u'ttl': 3600, u'type': u'SOA', u'name': name},
2139 ])
2140
2141 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2142 def test_search_rr_exact_zone_filter_type_zone(self):
2143 name = unique_zone_name()
2144 data_type = "zone"
2145 self.create_zone(name=name, serial=22, soa_edit_api='')
2146 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
2147 self.assert_success_json(r)
2148 print(r.json())
2149 self.assertEqual(r.json(), [
2150 {u'object_type': u'zone', u'name': name, u'zone_id': name},
2151 ])
2152
2153 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2154 def test_search_rr_exact_zone_filter_type_record(self):
2155 name = unique_zone_name()
2156 data_type = "record"
2157 self.create_zone(name=name, serial=22, soa_edit_api='')
2158 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
2159 self.assert_success_json(r)
2160 print(r.json())
2161 self.assertCountEqual(r.json(), [
2162 {u'content': u'ns1.example.com.',
2163 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2164 u'ttl': 3600, u'type': u'NS', u'name': name},
2165 {u'content': u'ns2.example.com.',
2166 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2167 u'ttl': 3600, u'type': u'NS', u'name': name},
2168 {u'content': u'a.misconfigured.dns.server.invalid. hostmaster.'+name+' 22 10800 3600 604800 3600',
2169 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2170 u'ttl': 3600, u'type': u'SOA', u'name': name},
2171 ])
2172
2173 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2174 def test_search_rr_substring(self):
2175 name = unique_zone_name()
2176 search = name[5:-5]
2177 self.create_zone(name=name)
2178 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
2179 self.assert_success_json(r)
2180 print(r.json())
2181 # should return zone, SOA, ns1, ns2
2182 self.assertEqual(len(r.json()), 4)
2183
2184 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2185 def test_search_rr_case_insensitive(self):
2186 name = unique_zone_name()+'testsuffix.'
2187 self.create_zone(name=name)
2188 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
2189 self.assert_success_json(r)
2190 print(r.json())
2191 # should return zone, SOA, ns1, ns2
2192 self.assertEqual(len(r.json()), 4)
2193
2194 @unittest.skipIf(is_auth_lmdb(), "No search or comments in LMDB")
2195 def test_search_rr_comment(self):
2196 name = unique_zone_name()
2197 rrsets = [{
2198 "name": name,
2199 "type": "AAAA",
2200 "ttl": 3600,
2201 "records": [{
2202 "content": "2001:DB8::1",
2203 "disabled": False,
2204 }],
2205 "comments": [{
2206 "account": "test AAAA",
2207 "content": "blah",
2208 "modified_at": 11112,
2209 }],
2210 }]
2211 name, payload, data = self.create_zone(name=name, rrsets=rrsets)
2212 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=blah"))
2213 self.assert_success_json(r)
2214 data = r.json()
2215 # should return the AAAA record
2216 self.assertEqual(len(data), 1)
2217 self.assertEqual(data[0]['object_type'], 'comment')
2218 self.assertEqual(data[0]['type'], 'AAAA')
2219 self.assertEqual(data[0]['name'], name)
2220 self.assertEqual(data[0]['content'], rrsets[0]['comments'][0]['content'])
2221
2222 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2223 def test_search_after_rectify_with_ent(self):
2224 name = unique_zone_name()
2225 search = name.split('.')[0]
2226 rrset = {
2227 "name": 'sub.sub.' + name,
2228 "type": "A",
2229 "ttl": 3600,
2230 "records": [{
2231 "content": "4.3.2.1",
2232 "disabled": False,
2233 }],
2234 }
2235 self.create_zone(name=name, rrsets=[rrset])
2236 pdnsutil_rectify(name)
2237 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
2238 self.assert_success_json(r)
2239 print(r.json())
2240 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
2241 self.assertEqual(len(r.json()), 5)
2242
2243 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2244 def test_default_api_rectify_dnssec(self):
2245 name = unique_zone_name()
2246 rrsets = [
2247 {
2248 "name": 'a.' + name,
2249 "type": "AAAA",
2250 "ttl": 3600,
2251 "records": [{
2252 "content": "2001:DB8::1",
2253 "disabled": False,
2254 }],
2255 },
2256 {
2257 "name": 'b.' + name,
2258 "type": "AAAA",
2259 "ttl": 3600,
2260 "records": [{
2261 "content": "2001:DB8::2",
2262 "disabled": False,
2263 }],
2264 },
2265 ]
2266 self.create_zone(name=name, rrsets=rrsets, dnssec=True, nsec3param='1 0 1 ab')
2267 dbrecs = get_db_records(name, 'AAAA')
2268 self.assertIsNotNone(dbrecs[0]['ordername'])
2269
2270 def test_default_api_rectify_nodnssec(self):
2271 """Without any DNSSEC settings, rectify should still add ENTs. Setup the zone
2272 so ENTs are necessary, and check for their existence using sdig.
2273 """
2274 name = unique_zone_name()
2275 rrsets = [
2276 {
2277 "name": 'a.sub.' + name,
2278 "type": "AAAA",
2279 "ttl": 3600,
2280 "records": [{
2281 "content": "2001:DB8::1",
2282 "disabled": False,
2283 }],
2284 },
2285 {
2286 "name": 'b.sub.' + name,
2287 "type": "AAAA",
2288 "ttl": 3600,
2289 "records": [{
2290 "content": "2001:DB8::2",
2291 "disabled": False,
2292 }],
2293 },
2294 ]
2295 self.create_zone(name=name, rrsets=rrsets)
2296 # default-api-rectify is yes (by default). expect rectify to have happened.
2297 assert 'Rcode: 0 ' in sdig('sub.' + name, 'TXT')
2298
2299 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2300 def test_override_api_rectify(self):
2301 name = unique_zone_name()
2302 search = name.split('.')[0]
2303 rrsets = [
2304 {
2305 "name": 'a.' + name,
2306 "type": "AAAA",
2307 "ttl": 3600,
2308 "records": [{
2309 "content": "2001:DB8::1",
2310 "disabled": False,
2311 }],
2312 },
2313 {
2314 "name": 'b.' + name,
2315 "type": "AAAA",
2316 "ttl": 3600,
2317 "records": [{
2318 "content": "2001:DB8::2",
2319 "disabled": False,
2320 }],
2321 },
2322 ]
2323 self.create_zone(name=name, rrsets=rrsets, api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
2324 dbrecs = get_db_records(name, 'AAAA')
2325 self.assertIsNone(dbrecs[0]['ordername'])
2326
2327 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2328 def test_explicit_rectify_success(self):
2329 name, _, data = self.create_zone = self.create_zone(api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
2330 dbrecs = get_db_records(name, 'SOA')
2331 self.assertIsNone(dbrecs[0]['ordername'])
2332 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/rectify"))
2333 self.assertEqual(r.status_code, 200)
2334 dbrecs = get_db_records(name, 'SOA')
2335 self.assertIsNotNone(dbrecs[0]['ordername'])
2336
2337 def test_explicit_rectify_slave(self):
2338 # Some users want to move a zone to kind=Slave and then rectify, without a re-transfer.
2339 name, _, data = self.create_zone = self.create_zone(api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
2340 self.put_zone(data['id'], {'kind': 'Slave'})
2341 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/rectify"))
2342 self.assertEqual(r.status_code, 200)
2343 if not is_auth_lmdb():
2344 dbrecs = get_db_records(name, 'SOA')
2345 self.assertIsNotNone(dbrecs[0]['ordername'])
2346
2347 def test_cname_at_ent_place(self):
2348 name, payload, zone = self.create_zone(dnssec=True, api_rectify=True)
2349 rrset = {
2350 'changetype': 'replace',
2351 'name': 'sub2.sub1.' + name,
2352 'type': "A",
2353 'ttl': 3600,
2354 'records': [{
2355 'content': "4.3.2.1",
2356 'disabled': False,
2357 }],
2358 }
2359 payload = {'rrsets': [rrset]}
2360 r = self.session.patch(
2361 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
2362 data=json.dumps(payload),
2363 headers={'content-type': 'application/json'})
2364 self.assertEqual(r.status_code, 204)
2365 rrset = {
2366 'changetype': 'replace',
2367 'name': 'sub1.' + name,
2368 'type': "CNAME",
2369 'ttl': 3600,
2370 'records': [{
2371 'content': "www.example.org.",
2372 'disabled': False,
2373 }],
2374 }
2375 payload = {'rrsets': [rrset]}
2376 r = self.session.patch(
2377 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
2378 data=json.dumps(payload),
2379 headers={'content-type': 'application/json'})
2380 self.assertEqual(r.status_code, 204)
2381
2382 def test_rrset_parameter_post_false(self):
2383 name = unique_zone_name()
2384 payload = {
2385 'name': name,
2386 'kind': 'Native',
2387 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
2388 }
2389 r = self.session.post(
2390 self.url("/api/v1/servers/localhost/zones?rrsets=false"),
2391 data=json.dumps(payload),
2392 headers={'content-type': 'application/json'})
2393 print(r.json())
2394 self.assert_success_json(r)
2395 self.assertEqual(r.status_code, 201)
2396 self.assertEqual(r.json().get('rrsets'), None)
2397
2398 def test_rrset_false_parameter(self):
2399 name = unique_zone_name()
2400 self.create_zone(name=name, kind='Native')
2401 data = self.get_zone(name, rrsets="false")
2402 self.assertEqual(data.get('rrsets'), None)
2403
2404 def test_rrset_true_parameter(self):
2405 name = unique_zone_name()
2406 self.create_zone(name=name, kind='Native')
2407 data = self.get_zone(name, rrsets="true")
2408 self.assertEqual(len(data['rrsets']), 2)
2409
2410 def test_wrong_rrset_parameter(self):
2411 name = unique_zone_name()
2412 self.create_zone(name=name, kind='Native')
2413 self.get_zone(
2414 name, rrsets="foobar",
2415 expect_error="'rrsets' request parameter value 'foobar' is not supported"
2416 )
2417
2418 def test_put_master_tsig_key_ids_non_existent(self):
2419 name = unique_zone_name()
2420 keyname = unique_zone_name().split('.')[0]
2421 self.create_zone(name=name, kind='Native')
2422 payload = {
2423 'master_tsig_key_ids': [keyname]
2424 }
2425 self.put_zone(name, payload, expect_error='A TSIG key with the name')
2426
2427 def test_put_slave_tsig_key_ids_non_existent(self):
2428 name = unique_zone_name()
2429 keyname = unique_zone_name().split('.')[0]
2430 self.create_zone(name=name, kind='Native')
2431 payload = {
2432 'slave_tsig_key_ids': [keyname]
2433 }
2434 self.put_zone(name, payload, expect_error='A TSIG key with the name')
2435
2436 def test_zone_replace_rrsets_basic(self):
2437 """Basic test: all automatic modification is off, on replace the new rrsets are ingested as is."""
2438 name, _, _ = self.create_zone(dnssec=False, soa_edit='', soa_edit_api='')
2439 rrsets = [
2440 {'name': name, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2441 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2442 {'name': 'www.' + name, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2443 {'name': 'sub.' + name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}]},
2444 ]
2445 self.put_zone(name, {'rrsets': rrsets})
2446
2447 data = self.get_zone(name)
2448 for rrset in rrsets:
2449 rrset.setdefault('comments', [])
2450 for record in rrset['records']:
2451 record.setdefault('disabled', False)
2452 assert_eq_rrsets(data['rrsets'], rrsets)
2453
2454 def test_zone_replace_rrsets_dnssec(self):
2455 """With dnssec: check automatic rectify is done"""
2456 name, _, _ = self.create_zone(dnssec=True)
2457 rrsets = [
2458 {'name': name, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2459 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2460 {'name': 'www.' + name, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2461 ]
2462 self.put_zone(name, {'rrsets': rrsets})
2463
2464 if not is_auth_lmdb():
2465 # lmdb: skip, no get_db_records implementations
2466 dbrecs = get_db_records(name, 'A')
2467 assert dbrecs[0]['ordername'] is not None # default = rectify enabled
2468
2469 def test_zone_replace_rrsets_with_soa_edit(self):
2470 """SOA-EDIT was enabled before rrsets will be replaced"""
2471 name, _, _ = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
2472 rrsets = [
2473 {'name': name, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2474 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2475 {'name': 'www.' + name, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2476 {'name': 'sub.' + name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}]},
2477 ]
2478 self.put_zone(name, {'rrsets': rrsets})
2479
2480 data = self.get_zone(name)
2481 soa = [rrset['records'][0]['content'] for rrset in data['rrsets'] if rrset['type'] == 'SOA'][0]
2482 assert int(soa.split()[2]) > 1 # serial is larger than what we sent
2483
2484 def test_zone_replace_rrsets_no_soa_primary(self):
2485 """Replace all RRsets but supply no SOA. Should fail."""
2486 name, _, _ = self.create_zone()
2487 rrsets = [
2488 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]}
2489 ]
2490 self.put_zone(name, {'rrsets': rrsets}, expect_error='Must give SOA record for zone when replacing all RR sets')
2491
2492 @parameterized.expand([
2493 (None, []),
2494 (None, [
2495 {'name': '$NAME$', 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2496 ]),
2497 ])
2498 def test_zone_replace_rrsets_secondary(self, expected_error, rrsets):
2499 """
2500 Replace all RRsets in a SECONDARY zone.
2501
2502 If no SOA is given, this should still succeed, also setting zone stale (but cannot assert this here).
2503 """
2504 name, _, _ = self.create_zone(kind='Secondary', nameservers=None, masters=['127.0.0.2'])
2505 self.put_zone(name, {'rrsets': templated_rrsets(rrsets, name)}, expect_error=expected_error)
2506
2507 @parameterized.expand([
2508 (None, []),
2509 ("Modifying RRsets in Consumer zones is unsupported", [
2510 {'name': '$NAME$', 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2511 ]),
2512 ])
2513 def test_zone_replace_rrsets_consumer(self, expected_error, rrsets):
2514 name, _, _ = self.create_zone(kind='Consumer', nameservers=None, masters=['127.0.0.2'])
2515 self.put_zone(name, {'rrsets': templated_rrsets(rrsets, name)}, expect_error=expected_error)
2516
2517 def test_zone_replace_rrsets_negative_ttl(self):
2518 name, _, _ = self.create_zone(dnssec=False, soa_edit='', soa_edit_api='')
2519 rrsets = [
2520 {'name': name, 'type': 'SOA', 'ttl': -1, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2521 ]
2522 self.put_zone(name, {'rrsets': rrsets}, expect_error="Key 'ttl' is not a positive Integer")
2523
2524 @parameterized.expand([
2525 ("IN MX: non-hostname content", [{'name': '$NAME$', 'type': 'MX', 'ttl': 3600, 'records': [{"content": "10 mail@mx.example.org."}]}]),
2526 ("out of zone", [{'name': 'not-in-zone.', 'type': 'NS', 'ttl': 3600, 'records': [{"content": "ns1.example.org."}]}]),
2527 ("contains unsupported characters", [{'name': 'test:.$NAME$', 'type': 'NS', 'ttl': 3600, 'records': [{"content": "ns1.example.org."}]}]),
2528 ("unknown type", [{'name': '$NAME$', 'type': 'INVALID', 'ttl': 3600, 'records': [{"content": "192.0.2.1"}]}]),
2529 ("Conflicts with another RRset", [{'name': '$NAME$', 'type': 'CNAME', 'ttl': 3600, 'records': [{"content": "example.org."}]}]),
2530 ])
2531 def test_zone_replace_rrsets_invalid(self, expected_error, invalid_rrsets):
2532 """Test validation of RRsets before replacing them"""
2533 name, _, _ = self.create_zone(dnssec=False, soa_edit='', soa_edit_api='')
2534 base_rrsets = [
2535 {'name': name, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2536 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2537 ]
2538 rrsets = base_rrsets + templated_rrsets(invalid_rrsets, name)
2539 self.put_zone(name, {'rrsets': rrsets}, expect_error=expected_error)
2540
2541
2542@unittest.skipIf(not is_auth(), "Not applicable")
2543class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
2544
2545 def setUp(self):
2546 super(AuthRootZone, self).setUp()
2547 # zone name is not unique, so delete the zone before each individual test.
2548 self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
2549
2550 def test_create_zone(self):
2551 name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
2552 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
2553 self.assertIn(k, data)
2554 if k in payload:
2555 self.assertEqual(data[k], payload[k])
2556 # validate generated SOA
2557 rec = get_first_rec(data, '.', 'SOA')
2558 self.assertEqual(
2559 rec['content'],
2560 "a.misconfigured.dns.server.invalid. hostmaster. " + str(payload['serial']) +
2561 " 10800 3600 604800 3600"
2562 )
2563 # Regression test: verify zone list works
2564 zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
2565 print("zonelist:", zonelist)
2566 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
2567 # Also test that fetching the zone works.
2568 print("id:", data['id'])
2569 self.assertEqual(data['id'], '=2E')
2570 data = self.get_zone(data['id'])
2571 print("zone (fetched):", data)
2572 for k in ('name', 'kind'):
2573 self.assertIn(k, data)
2574 self.assertEqual(data[k], payload[k])
2575 self.assertEqual(data['rrsets'][0]['name'], '.')
2576
2577 def test_update_zone(self):
2578 name, payload, zone = self.create_zone(name='.')
2579 zone_id = '=2E'
2580 # update, set as Master and enable SOA-EDIT-API
2581 payload = {
2582 'kind': 'Master',
2583 'masters': ['192.0.2.1', '192.0.2.2'],
2584 'soa_edit_api': 'EPOCH',
2585 'soa_edit': 'EPOCH'
2586 }
2587 self.put_zone(zone_id, payload)
2588 data = self.get_zone(zone_id)
2589 for k in payload.keys():
2590 self.assertIn(k, data)
2591 self.assertEqual(data[k], payload[k])
2592 # update, back to Native and empty(off)
2593 payload = {
2594 'kind': 'Native',
2595 'soa_edit_api': '',
2596 'soa_edit': ''
2597 }
2598 self.put_zone(zone_id, payload)
2599 data = self.get_zone(zone_id)
2600 for k in payload.keys():
2601 self.assertIn(k, data)
2602 self.assertEqual(data[k], payload[k])
2603
2604
2605@unittest.skipIf(not is_recursor(), "Not applicable")
2606class RecursorZones(ApiTestCase):
2607
2608 def create_zone(self, name=None, kind=None, rd=False, servers=None, notify_allowed=False):
2609 if name is None:
2610 name = unique_zone_name()
2611 if servers is None:
2612 servers = []
2613 payload = {
2614 'name': name,
2615 'kind': kind,
2616 'servers': servers,
2617 'recursion_desired': rd,
2618 'notify_allowed': notify_allowed
2619 }
2620 r = self.session.post(
2621 self.url("/api/v1/servers/localhost/zones"),
2622 data=json.dumps(payload),
2623 headers={'content-type': 'application/json'})
2624 self.assert_success_json(r)
2625 return payload, r.json()
2626
2627 def test_create_auth_zone(self):
2628 payload, data = self.create_zone(kind='Native')
2629 for k in payload.keys():
2630 self.assertEqual(data[k], payload[k])
2631
2632 def test_create_zone_no_name(self):
2633 payload = {
2634 'name': '',
2635 'kind': 'Native',
2636 'servers': ['8.8.8.8'],
2637 'recursion_desired': False,
2638 }
2639 print(payload)
2640 r = self.session.post(
2641 self.url("/api/v1/servers/localhost/zones"),
2642 data=json.dumps(payload),
2643 headers={'content-type': 'application/json'})
2644 self.assertEqual(r.status_code, 422)
2645 self.assertIn('is not canonical', r.json()['error'])
2646
2647 def test_create_forwarded_zone(self):
2648 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
2649 # return values are normalized
2650 payload['servers'][0] += ':53'
2651 for k in payload.keys():
2652 self.assertEqual(data[k], payload[k])
2653
2654 def test_create_forwarded_zone_notify_allowed(self):
2655 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'], notify_allowed=True)
2656 # return values are normalized
2657 payload['servers'][0] += ':53'
2658 for k in payload.keys():
2659 self.assertEqual(data[k], payload[k])
2660
2661 def test_create_forwarded_rd_zone(self):
2662 payload, data = self.create_zone(name='google.com.', kind='Forwarded', rd=True, servers=['8.8.8.8'])
2663 # return values are normalized
2664 payload['servers'][0] += ':53'
2665 for k in payload.keys():
2666 self.assertEqual(data[k], payload[k])
2667
2668 def test_create_auth_zone_with_symbols(self):
2669 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
2670 expected_id = (payload['name'].replace('/', '=2F'))
2671 for k in payload.keys():
2672 self.assertEqual(data[k], payload[k])
2673 self.assertEqual(data['id'], expected_id)
2674
2675 def test_rename_auth_zone(self):
2676 payload, data = self.create_zone(kind='Native')
2677 name = payload['name']
2678 # now rename it
2679 payload = {
2680 'name': 'renamed-'+name,
2681 'kind': 'Native',
2682 'recursion_desired': False
2683 }
2684 r = self.session.put(
2685 self.url("/api/v1/servers/localhost/zones/" + name),
2686 data=json.dumps(payload),
2687 headers={'content-type': 'application/json'})
2688 self.assert_success(r)
2689 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + payload['name'])).json()
2690 for k in payload.keys():
2691 self.assertEqual(data[k], payload[k])
2692
2693 def test_zone_delete(self):
2694 payload, zone = self.create_zone(kind='Native')
2695 name = payload['name']
2696 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
2697 self.assertEqual(r.status_code, 204)
2698 self.assertNotIn('Content-Type', r.headers)
2699
2700 def test_search_rr_exact_zone(self):
2701 name = unique_zone_name()
2702 self.create_zone(name=name, kind='Native')
2703 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
2704 self.assert_success_json(r)
2705 print(r.json())
2706 self.assertEqual(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
2707
2708 def test_search_rr_substring(self):
2709 name = 'search-rr-zone.name.'
2710 self.create_zone(name=name, kind='Native')
2711 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2712 self.assert_success_json(r)
2713 print(r.json())
2714 # should return zone, SOA
2715 self.assertEqual(len(r.json()), 2)
2716
2717@unittest.skipIf(not is_auth(), "Not applicable")
2718class AuthZoneKeys(ApiTestCase, AuthZonesHelperMixin):
2719
2720 def test_get_keys(self):
2721 r = self.session.get(
2722 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2723 self.assert_success_json(r)
2724 keys = r.json()
2725 self.assertGreater(len(keys), 0)
2726
2727 key0 = deepcopy(keys[0])
2728 del key0['dnskey']
2729 del key0['ds']
2730 expected = {
2731 u'algorithm': u'ECDSAP256SHA256',
2732 u'bits': 256,
2733 u'active': True,
2734 u'type': u'Cryptokey',
2735 u'keytype': u'csk',
2736 u'flags': 257,
2737 u'published': True,
2738 u'id': 1}
2739 self.assertEqual(key0, expected)
2740
2741 keydata = keys[0]['dnskey'].split()
2742 self.assertEqual(len(keydata), 4)
2743
2744 def test_get_keys_with_cds(self):
2745 payload_metadata = {"type": "Metadata", "kind": "PUBLISH-CDS", "metadata": ["4"]}
2746 r = self.session.post(self.url("/api/v1/servers/localhost/zones/powerdnssec.org./metadata"),
2747 data=json.dumps(payload_metadata))
2748 rdata = r.json()
2749 self.assertEqual(r.status_code, 201)
2750 self.assertEqual(rdata["metadata"], payload_metadata["metadata"])
2751
2752 r = self.session.get(
2753 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2754 self.assert_success_json(r)
2755 keys = r.json()
2756 self.assertGreater(len(keys), 0)
2757
2758 key0 = deepcopy(keys[0])
2759 self.assertEqual(len(key0['cds']), 1)
2760 self.assertIn(key0['cds'][0], key0['ds'])
2761 self.assertEqual(key0['cds'][0].split()[2], '4')
2762 del key0['dnskey']
2763 del key0['ds']
2764 del key0['cds']
2765 expected = {
2766 u'algorithm': u'ECDSAP256SHA256',
2767 u'bits': 256,
2768 u'active': True,
2769 u'type': u'Cryptokey',
2770 u'keytype': u'csk',
2771 u'flags': 257,
2772 u'published': True,
2773 u'id': 1}
2774 self.assertEqual(key0, expected)
2775
2776 keydata = keys[0]['dnskey'].split()
2777 self.assertEqual(len(keydata), 4)
2778
2779 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/powerdnssec.org./metadata/PUBLISH-CDS"))
2780 self.assertEqual(r.status_code, 204)