]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
code review from otto, thanks
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
1 from __future__ import print_function
2 import json
3 import operator
4 import time
5 import unittest
6 import requests.exceptions
7 from copy import deepcopy
8 from parameterized import parameterized
9 from pprint import pprint
10 from test_helper import ApiTestCase, unique_zone_name, is_auth, is_auth_lmdb, is_recursor, get_db_records, pdnsutil_rectify, sdig
11
12
13 def get_rrset(data, qname, qtype):
14 for rrset in data['rrsets']:
15 if rrset['name'] == qname and rrset['type'] == qtype:
16 return rrset
17 return None
18
19
20 def get_first_rec(data, qname, qtype):
21 rrset = get_rrset(data, qname, qtype)
22 if rrset:
23 return rrset['records'][0]
24 return None
25
26
27 def eq_zone_rrsets(rrsets, expected):
28 data_got = {}
29 data_expected = {}
30 for type_, expected_records in expected.items():
31 type_ = str(type_)
32 data_got[type_] = set()
33 data_expected[type_] = set()
34 uses_name = any(['name' in expected_record for expected_record in expected_records])
35 # minify + convert received data
36 for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
37 print(rrset)
38 for r in rrset['records']:
39 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
40 # minify expected data
41 for r in expected_records:
42 data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
43
44 print("eq_zone_rrsets: got:")
45 pprint(data_got)
46 print("eq_zone_rrsets: expected:")
47 pprint(data_expected)
48
49 assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
50
51
52 def assert_eq_rrsets(rrsets, expected):
53 """Assert rrsets sets are equal, ignoring sort order."""
54 key = lambda rrset: (rrset['name'], rrset['type'])
55 assert sorted(rrsets, key=key) == sorted(expected, key=key)
56
57
58 def templated_rrsets(rrsets: list, zonename: str):
59 """
60 Replace $NAME$ in `name` and `content` of given rrsets with `zonename`.
61 Will return a copy. Original rrsets should stay unmodified.
62 """
63 new_rrsets = []
64 for rrset in rrsets:
65 new_rrset = rrset | {"name": rrset["name"].replace('$NAME$', zonename)}
66
67 if "records" in rrset:
68 records = []
69 for record in rrset["records"]:
70 records.append(record | {"content": record["content"].replace('$NAME$', zonename)})
71 new_rrset["records"] = records
72
73 new_rrsets.append(new_rrset)
74
75 return new_rrsets
76
77
78 class Zones(ApiTestCase):
79
80 def _test_list_zones(self, dnssec=True):
81 path = "/api/v1/servers/localhost/zones"
82 if not dnssec:
83 path = path + "?dnssec=false"
84 r = self.session.get(self.url(path))
85 self.assert_success_json(r)
86 domains = r.json()
87 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
88 self.assertEqual(len(example_com), 1)
89 example_com = example_com[0]
90 print(example_com)
91 required_fields = ['id', 'url', 'name', 'kind']
92 if is_auth():
93 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial', 'account', 'catalog']
94 if dnssec:
95 required_fields = required_fields = ['dnssec', 'edited_serial']
96 self.assertNotEqual(example_com['serial'], 0)
97 if not dnssec:
98 self.assertNotIn('dnssec', example_com)
99 elif is_recursor():
100 required_fields = required_fields + ['recursion_desired', 'servers']
101 for field in required_fields:
102 self.assertIn(field, example_com)
103
104 def test_list_zones_with_dnssec(self):
105 if is_auth():
106 self._test_list_zones(True)
107
108 def test_list_zones_without_dnssec(self):
109 self._test_list_zones(False)
110
111
112 class AuthZonesHelperMixin(object):
113 def create_zone(self, name=None, expect_error=None, **kwargs):
114 if name is None:
115 name = unique_zone_name()
116 payload = {
117 "name": name,
118 "kind": "Native",
119 "nameservers": ["ns1.example.com.", "ns2.example.com."]
120 }
121 for k, v in kwargs.items():
122 if v is None:
123 del payload[k]
124 else:
125 payload[k] = v
126 if "zone" in payload:
127 payload["zone"] = payload["zone"].replace("$NAME$", payload["name"])
128 if "rrsets" in payload:
129 payload["rrsets"] = templated_rrsets(payload["rrsets"], payload["name"])
130
131 print("Create zone", name, "with:", payload)
132 r = self.session.post(
133 self.url("/api/v1/servers/localhost/zones"),
134 data=json.dumps(payload),
135 headers={"content-type": "application/json"})
136
137 if expect_error:
138 self.assertEqual(r.status_code, 422, r.content)
139 reply = r.json()
140 if expect_error is True:
141 pass
142 else:
143 self.assertIn(expect_error, reply["error"])
144 else:
145 # expect success
146 self.assertEqual(r.status_code, 201, r.content)
147 reply = r.json()
148
149 return name, payload, reply
150
151 def get_zone(self, api_zone_id, expect_error=None, **kwargs):
152 print("GET zone", api_zone_id, "args:", kwargs)
153 r = self.session.get(
154 self.url("/api/v1/servers/localhost/zones/" + api_zone_id),
155 params=kwargs
156 )
157
158 reply = r.json()
159 print("reply", reply)
160
161 if expect_error:
162 self.assertEqual(r.status_code, 422)
163 if expect_error is True:
164 pass
165 else:
166 self.assertIn(expect_error, r.json()['error'])
167 else:
168 # expect success
169 self.assert_success_json(r)
170 self.assertEqual(r.status_code, 200)
171
172 return reply
173
174 def put_zone(self, api_zone_id, payload, expect_error=None):
175 print("PUT zone", api_zone_id, "with:", payload)
176 r = self.session.put(
177 self.url("/api/v1/servers/localhost/zones/" + api_zone_id),
178 data=json.dumps(payload),
179 headers={'content-type': 'application/json'})
180
181 print("reply status code:", r.status_code)
182 if expect_error:
183 self.assertEqual(r.status_code, 422, r.content)
184 reply = r.json()
185 if expect_error is True:
186 pass
187 else:
188 self.assertIn(expect_error, reply['error'])
189 else:
190 # expect success (no content)
191 self.assertEqual(r.status_code, 204, r.content)
192
193 @unittest.skipIf(not is_auth(), "Not applicable")
194 class AuthZones(ApiTestCase, AuthZonesHelperMixin):
195
196 def test_create_zone(self):
197 # soa_edit_api has a default, override with empty for this test
198 name, payload, data = self.create_zone(serial=22, soa_edit_api='')
199 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'edited_serial', 'soa_edit_api', 'soa_edit', 'account'):
200 self.assertIn(k, data)
201 if k in payload:
202 self.assertEqual(data[k], payload[k])
203 # validate generated SOA
204 expected_soa = "a.misconfigured.dns.server.invalid. hostmaster." + name + " " + \
205 str(payload['serial']) + " 10800 3600 604800 3600"
206 self.assertEqual(
207 get_first_rec(data, name, 'SOA')['content'],
208 expected_soa
209 )
210
211 if not is_auth_lmdb():
212 # Because we had confusion about dots, check that the DB is without dots.
213 dbrecs = get_db_records(name, 'SOA')
214 self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
215 self.assertNotEqual(data['serial'], data['edited_serial'])
216
217 def test_create_zone_with_soa_edit_api(self):
218 # soa_edit_api wins over serial
219 name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
220 for k in ('soa_edit_api', ):
221 self.assertIn(k, data)
222 if k in payload:
223 self.assertEqual(data[k], payload[k])
224 # generated EPOCH serial surely is > fixed serial we passed in
225 print(data)
226 self.assertGreater(data['serial'], payload['serial'])
227 soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
228 self.assertGreater(soa_serial, payload['serial'])
229 self.assertEqual(soa_serial, data['serial'])
230
231 def test_create_zone_with_catalog(self):
232 # soa_edit_api wins over serial
233 name, payload, data = self.create_zone(catalog='catalog.invalid.', serial=10)
234 print(data)
235 for k in ('catalog', ):
236 self.assertIn(k, data)
237 if k in payload:
238 self.assertEqual(data[k], payload[k])
239
240 # check that the catalog is reflected in the /zones output (#13633)
241 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
242 self.assert_success_json(r)
243 domains = r.json()
244 domain = [domain for domain in domains if domain['name'] == name]
245 self.assertEqual(len(domain), 1)
246 domain = domain[0]
247 self.assertEqual(domain["catalog"], "catalog.invalid.")
248
249 def test_create_zone_with_account(self):
250 # soa_edit_api wins over serial
251 name, payload, data = self.create_zone(account='anaccount', serial=10, kind='Master')
252 print(data)
253 for k in ('account', ):
254 self.assertIn(k, data)
255 if k in payload:
256 self.assertEqual(data[k], payload[k])
257
258 # as we did not set a catalog in our request, check that the default catalog was applied
259 self.assertEqual(data['catalog'], "default-catalog.example.com.")
260
261 def test_create_zone_default_soa_edit_api(self):
262 name, payload, data = self.create_zone()
263 print(data)
264 self.assertEqual(data['soa_edit_api'], 'DEFAULT')
265
266 def test_create_zone_exists(self):
267 name, payload, data = self.create_zone()
268 print(data)
269 payload = {
270 'name': name,
271 'kind': 'Native'
272 }
273 print(payload)
274 r = self.session.post(
275 self.url("/api/v1/servers/localhost/zones"),
276 data=json.dumps(payload),
277 headers={'content-type': 'application/json'})
278 self.assertEqual(r.status_code, 409) # Conflict - already exists
279
280 def test_create_zone_with_soa_edit(self):
281 name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
282 print(data)
283 self.assertEqual(data['soa_edit'], 'INCEPTION-INCREMENT')
284 self.assertEqual(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
285 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
286 # These particular settings lead to the first serial set to YYYYMMDD01.
287 self.assertEqual(soa_serial[-2:], '01')
288 rrset = {
289 'changetype': 'replace',
290 'name': name,
291 'type': 'A',
292 'ttl': 3600,
293 'records': [
294 {
295 "content": "127.0.0.1",
296 "disabled": False
297 }
298 ]
299 }
300 payload = {'rrsets': [rrset]}
301 self.session.patch(
302 self.url("/api/v1/servers/localhost/zones/" + data['id']),
303 data=json.dumps(payload),
304 headers={'content-type': 'application/json'})
305 data = self.get_zone(data['id'])
306 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
307 self.assertEqual(soa_serial[-2:], '02')
308
309 def test_create_zone_with_records(self):
310 name = unique_zone_name()
311 rrset = {
312 "name": name,
313 "type": "A",
314 "ttl": 3600,
315 "records": [{
316 "content": "4.3.2.1",
317 "disabled": False,
318 }],
319 }
320 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
321 # check our record has appeared
322 self.assertEqual(get_rrset(data, name, 'A')['records'], rrset['records'])
323
324 def test_create_zone_with_wildcard_records(self):
325 name = unique_zone_name()
326 rrset = {
327 "name": "*."+name,
328 "type": "A",
329 "ttl": 3600,
330 "records": [{
331 "content": "4.3.2.1",
332 "disabled": False,
333 }],
334 }
335 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
336 # check our record has appeared
337 self.assertEqual(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
338
339 def test_create_zone_with_comments(self):
340 name = unique_zone_name()
341 rrsets = [
342 {
343 "name": name,
344 "type": "soa", # test uppercasing of type, too.
345 "comments": [{
346 "account": "test1",
347 "content": "blah blah and test a few non-ASCII chars: ö, €",
348 "modified_at": 11112,
349 }],
350 },
351 {
352 "name": name,
353 "type": "AAAA",
354 "ttl": 3600,
355 "records": [{
356 "content": "2001:DB8::1",
357 "disabled": False,
358 }],
359 "comments": [{
360 "account": "test AAAA",
361 "content": "blah blah AAAA",
362 "modified_at": 11112,
363 }],
364 },
365 {
366 "name": name,
367 "type": "TXT",
368 "ttl": 3600,
369 "records": [{
370 "content": "\"test TXT\"",
371 "disabled": False,
372 }],
373 },
374 {
375 "name": name,
376 "type": "A",
377 "ttl": 3600,
378 "records": [{
379 "content": "192.0.2.1",
380 "disabled": False,
381 }],
382 },
383 ]
384
385 if is_auth_lmdb():
386 # No comments in LMDB
387 self.create_zone(name=name, rrsets=rrsets, expect_error="Hosting backend does not support editing comments.")
388 return
389
390 name, _, data = self.create_zone(name=name, rrsets=rrsets)
391 # NS records have been created
392 self.assertEqual(len(data['rrsets']), len(rrsets) + 1)
393 # check our comment has appeared
394 self.assertEqual(get_rrset(data, name, 'SOA')['comments'], rrsets[0]['comments'])
395 self.assertEqual(get_rrset(data, name, 'A')['comments'], [])
396 self.assertEqual(get_rrset(data, name, 'TXT')['comments'], [])
397 self.assertEqual(get_rrset(data, name, 'AAAA')['comments'], rrsets[1]['comments'])
398
399 def test_create_zone_uncanonical_nameservers(self):
400 name = unique_zone_name()
401 payload = {
402 'name': name,
403 'kind': 'Native',
404 'nameservers': ['uncanon.example.com']
405 }
406 print(payload)
407 r = self.session.post(
408 self.url("/api/v1/servers/localhost/zones"),
409 data=json.dumps(payload),
410 headers={'content-type': 'application/json'})
411 self.assertEqual(r.status_code, 422)
412 self.assertIn('Nameserver is not canonical', r.json()['error'])
413
414 def test_create_auth_zone_no_name(self):
415 name = unique_zone_name()
416 payload = {
417 'name': '',
418 'kind': 'Native',
419 }
420 print(payload)
421 r = self.session.post(
422 self.url("/api/v1/servers/localhost/zones"),
423 data=json.dumps(payload),
424 headers={'content-type': 'application/json'})
425 self.assertEqual(r.status_code, 422)
426 self.assertIn('is not canonical', r.json()['error'])
427
428 def test_create_zone_with_custom_soa(self):
429 name = unique_zone_name()
430 content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
431 rrset = {
432 "name": name,
433 "type": "soa", # test uppercasing of type, too.
434 "ttl": 3600,
435 "records": [{
436 "content": content,
437 "disabled": False,
438 }],
439 }
440 name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
441 self.assertEqual(get_rrset(data, name, 'SOA')['records'], rrset['records'])
442 if not is_auth_lmdb():
443 dbrecs = get_db_records(name, 'SOA')
444 self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
445
446 def test_create_zone_double_dot(self):
447 name = 'test..' + unique_zone_name()
448 payload = {
449 'name': name,
450 'kind': 'Native',
451 'nameservers': ['ns1.example.com.']
452 }
453 print(payload)
454 r = self.session.post(
455 self.url("/api/v1/servers/localhost/zones"),
456 data=json.dumps(payload),
457 headers={'content-type': 'application/json'})
458 self.assertEqual(r.status_code, 422)
459 self.assertIn('Unable to parse DNS Name', r.json()['error'])
460
461 def test_create_zone_restricted_chars(self):
462 name = 'test:' + unique_zone_name() # : isn't good as a name.
463 payload = {
464 'name': name,
465 'kind': 'Native',
466 'nameservers': ['ns1.example.com']
467 }
468 print(payload)
469 r = self.session.post(
470 self.url("/api/v1/servers/localhost/zones"),
471 data=json.dumps(payload),
472 headers={'content-type': 'application/json'})
473 self.assertEqual(r.status_code, 422)
474 self.assertIn('contains unsupported characters', r.json()['error'])
475
476 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self):
477 name = unique_zone_name()
478 rrset = {
479 "name": name,
480 "type": "NS",
481 "ttl": 3600,
482 "records": [{
483 "content": "ns2.example.com.",
484 "disabled": False,
485 }],
486 }
487 payload = {
488 'name': name,
489 'kind': 'Native',
490 'nameservers': ['ns1.example.com.'],
491 'rrsets': [rrset],
492 }
493 print(payload)
494 r = self.session.post(
495 self.url("/api/v1/servers/localhost/zones"),
496 data=json.dumps(payload),
497 headers={'content-type': 'application/json'})
498 self.assertEqual(r.status_code, 422)
499 self.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r.json()['error'])
500
501 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self):
502 name = unique_zone_name()
503 rrset = {
504 "name": 'subzone.'+name,
505 "type": "NS",
506 "ttl": 3600,
507 "records": [{
508 "content": "ns2.example.com.",
509 "disabled": False,
510 }],
511 }
512 payload = {
513 'name': name,
514 'kind': 'Native',
515 'nameservers': ['ns1.example.com.'],
516 'rrsets': [rrset],
517 }
518 print(payload)
519 r = self.session.post(
520 self.url("/api/v1/servers/localhost/zones"),
521 data=json.dumps(payload),
522 headers={'content-type': 'application/json'})
523 self.assert_success_json(r)
524
525 def test_create_zone_with_symbols(self):
526 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
527 name = payload['name']
528 expected_id = name.replace('/', '=2F')
529 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
530 self.assertIn(k, data)
531 if k in payload:
532 self.assertEqual(data[k], payload[k])
533 self.assertEqual(data['id'], expected_id)
534 if not is_auth_lmdb():
535 dbrecs = get_db_records(name, 'SOA')
536 self.assertEqual(dbrecs[0]['name'], name.rstrip('.'))
537
538 def test_create_zone_with_nameservers_non_string(self):
539 # ensure we don't crash
540 name = unique_zone_name()
541 payload = {
542 'name': name,
543 'kind': 'Native',
544 'nameservers': [{'a': 'ns1.example.com'}] # invalid
545 }
546 print(payload)
547 r = self.session.post(
548 self.url("/api/v1/servers/localhost/zones"),
549 data=json.dumps(payload),
550 headers={'content-type': 'application/json'})
551 self.assertEqual(r.status_code, 422)
552
553 def test_create_zone_with_dnssec(self):
554 """
555 Create a zone with "dnssec" set and see if a key was made.
556 """
557 name = unique_zone_name()
558 name, payload, data = self.create_zone(dnssec=True)
559
560 self.get_zone(name)
561
562 for k in ('dnssec', ):
563 self.assertIn(k, data)
564 if k in payload:
565 self.assertEqual(data[k], payload[k])
566
567 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
568
569 keys = r.json()
570
571 print(keys)
572
573 self.assertEqual(r.status_code, 200)
574 self.assertEqual(len(keys), 1)
575 self.assertEqual(keys[0]['type'], 'Cryptokey')
576 self.assertEqual(keys[0]['active'], True)
577 self.assertEqual(keys[0]['keytype'], 'csk')
578
579 def test_create_zone_with_dnssec_disable_dnssec(self):
580 """
581 Create a zone with "dnssec", then set "dnssec" to false and see if the
582 keys are gone
583 """
584 name = unique_zone_name()
585 name, payload, data = self.create_zone(dnssec=True)
586
587 self.put_zone(name, {'dnssec': False})
588
589 zoneinfo = self.get_zone(name)
590 self.assertEqual(zoneinfo['dnssec'], False)
591
592 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
593
594 keys = r.json()
595
596 self.assertEqual(r.status_code, 200)
597 self.assertEqual(len(keys), 0)
598
599 def test_create_zone_with_nsec3param(self):
600 """
601 Create a zone with "nsec3param" set and see if the metadata was added.
602 """
603 name = unique_zone_name()
604 nsec3param = '1 0 100 aabbccddeeff'
605 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
606
607 self.get_zone(name)
608
609 for k in ('dnssec', 'nsec3param'):
610 self.assertIn(k, data)
611 if k in payload:
612 self.assertEqual(data[k], payload[k])
613
614 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3PARAM'))
615
616 data = r.json()
617
618 print(data)
619
620 self.assertEqual(r.status_code, 200)
621 self.assertEqual(len(data['metadata']), 1)
622 self.assertEqual(data['kind'], 'NSEC3PARAM')
623 self.assertEqual(data['metadata'][0], nsec3param)
624
625 def test_create_zone_with_nsec3narrow(self):
626 """
627 Create a zone with "nsec3narrow" set and see if the metadata was added.
628 """
629 name = unique_zone_name()
630 nsec3param = '1 0 100 aabbccddeeff'
631 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
632 nsec3narrow=True)
633
634 self.get_zone(name)
635
636 for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
637 self.assertIn(k, data)
638 if k in payload:
639 self.assertEqual(data[k], payload[k])
640
641 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3NARROW'))
642
643 data = r.json()
644
645 print(data)
646
647 self.assertEqual(r.status_code, 200)
648 self.assertEqual(len(data['metadata']), 1)
649 self.assertEqual(data['kind'], 'NSEC3NARROW')
650 self.assertEqual(data['metadata'][0], '1')
651
652 def test_create_zone_with_nsec3param_switch_to_nsec(self):
653 """
654 Create a zone with "nsec3param", then remove the params
655 """
656 name, payload, data = self.create_zone(dnssec=True,
657 nsec3param='1 0 1 ab')
658 self.put_zone(name, {'nsec3param': ''})
659
660 data = self.get_zone(name)
661 self.assertEqual(data['nsec3param'], '')
662
663 def test_create_zone_without_dnssec_unset_nsec3parm(self):
664 """
665 Create a non dnssec zone and set an empty "nsec3param"
666 """
667 name, payload, data = self.create_zone(dnssec=False)
668 self.put_zone(name, {'nsec3param': ''})
669
670 def test_create_zone_without_dnssec_set_nsec3parm(self):
671 """
672 Create a non dnssec zone and set "nsec3param"
673 """
674 name, payload, data = self.create_zone(dnssec=False)
675 self.put_zone(name, {'nsec3param': '1 0 1 ab'}, expect_error=True)
676
677 def test_create_zone_dnssec_serial(self):
678 """
679 Create a zone, then set and unset "dnssec", then check if the serial was increased
680 after every step
681 """
682 name, payload, data = self.create_zone()
683
684 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
685 self.assertEqual(soa_serial[-2:], '01')
686
687 self.put_zone(name, {'dnssec': True})
688
689 data = self.get_zone(name)
690 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
691 self.assertEqual(soa_serial[-2:], '02')
692
693 self.put_zone(name, {'dnssec': False})
694
695 data = self.get_zone(name)
696 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
697 self.assertEqual(soa_serial[-2:], '03')
698
699 def test_zone_absolute_url(self):
700 self.create_zone()
701 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
702 rdata = r.json()
703 print(rdata[0])
704 self.assertTrue(rdata[0]['url'].startswith('/api/v'))
705
706 def test_create_zone_metadata(self):
707 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
708 r = self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
709 data=json.dumps(payload_metadata))
710 rdata = r.json()
711 self.assertEqual(r.status_code, 201)
712 self.assertEqual(rdata["metadata"], payload_metadata["metadata"])
713
714 def test_create_zone_metadata_kind(self):
715 payload_metadata = {"metadata": ["127.0.0.2"]}
716 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
717 data=json.dumps(payload_metadata))
718 rdata = r.json()
719 self.assertEqual(r.status_code, 200)
720 self.assertEqual(rdata["metadata"], payload_metadata["metadata"])
721
722 def test_create_protected_zone_metadata(self):
723 # test whether it prevents modification of certain kinds
724 for k in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
725 payload = {"metadata": ["FOO", "BAR"]}
726 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k),
727 data=json.dumps(payload))
728 self.assertEqual(r.status_code, 422)
729
730 def test_retrieve_zone_metadata(self):
731 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
732 self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
733 data=json.dumps(payload_metadata))
734 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata"))
735 rdata = r.json()
736 self.assertEqual(r.status_code, 200)
737 self.assertIn(payload_metadata, rdata)
738
739 def test_delete_zone_metadata(self):
740 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
741 self.assertEqual(r.status_code, 204)
742 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
743 rdata = r.json()
744 self.assertEqual(r.status_code, 200)
745 self.assertEqual(rdata["metadata"], [])
746
747 def test_create_external_zone_metadata(self):
748 payload_metadata = {"metadata": ["My very important message"]}
749 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
750 data=json.dumps(payload_metadata))
751 self.assertEqual(r.status_code, 200)
752 rdata = r.json()
753 self.assertEqual(rdata["metadata"], payload_metadata["metadata"])
754
755 def test_create_metadata_in_non_existent_zone(self):
756 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
757 r = self.session.post(self.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
758 data=json.dumps(payload_metadata))
759 self.assertEqual(r.status_code, 404)
760 # Note: errors should probably contain json (see #5988)
761 # self.assertIn('Could not find domain ', r.json()['error'])
762
763 def test_create_slave_zone(self):
764 # Test that nameservers can be absent for slave zones.
765 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
766 for k in ('name', 'masters', 'kind'):
767 self.assertIn(k, data)
768 self.assertEqual(data[k], payload[k])
769 print("payload:", payload)
770 print("data:", data)
771 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
772 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
773 zonelist = r.json()
774 print("zonelist:", zonelist)
775 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
776 # Also test that fetching the zone works.
777 data = self.get_zone(data['id'])
778 print("zone (fetched):", data)
779 for k in ('name', 'masters', 'kind'):
780 self.assertIn(k, data)
781 self.assertEqual(data[k], payload[k])
782 self.assertEqual(data['serial'], 0)
783 self.assertEqual(data['rrsets'], [])
784
785 def test_create_consumer_zone(self):
786 # Test that nameservers can be absent for consumer zones.
787 _, payload, data = self.create_zone(kind='Consumer', nameservers=None, masters=['127.0.0.2'])
788 print("payload:", payload)
789 print("data:", data)
790 # Because consumer zones don't get a SOA, we need to test that they'll show up in the zone list.
791 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
792 zonelist = r.json()
793 print("zonelist:", zonelist)
794 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
795 # Also test that fetching the zone works.
796 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
797 data = r.json()
798 print("zone (fetched):", data)
799 for k in ('name', 'masters', 'kind'):
800 self.assertIn(k, data)
801 self.assertEqual(data[k], payload[k])
802 self.assertEqual(data['serial'], 0)
803 self.assertEqual(data['rrsets'], [])
804
805 def test_create_consumer_zone_no_nameservers(self):
806 """nameservers must be absent for Consumer zones"""
807 self.create_zone(kind="Consumer", nameservers=["127.0.0.1"], expect_error="Nameservers MUST NOT be given for Consumer zones")
808
809 def test_create_consumer_zone_no_rrsets(self):
810 """rrsets must be absent for Consumer zones"""
811 rrsets = [{
812 "name": "$NAME$",
813 "type": "SOA",
814 "ttl": 3600,
815 "records": [{
816 "content": "ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600",
817 "disabled": False,
818 }],
819 }]
820 self.create_zone(kind="Consumer", nameservers=None, rrsets=rrsets, expect_error="Zone data MUST NOT be given for Consumer zones")
821
822 def test_find_zone_by_name(self):
823 name = 'foo/' + unique_zone_name()
824 name, payload, data = self.create_zone(name=name)
825 r = self.session.get(self.url("/api/v1/servers/localhost/zones?zone=" + name))
826 data = r.json()
827 print(data)
828 self.assertEqual(data[0]['name'], name)
829
830 def test_delete_slave_zone(self):
831 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
832 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
833 r.raise_for_status()
834
835 def test_delete_consumer_zone(self):
836 name, payload, data = self.create_zone(kind='Consumer', nameservers=None, masters=['127.0.0.2'])
837 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
838 r.raise_for_status()
839
840 def test_retrieve_slave_zone(self):
841 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
842 print("payload:", payload)
843 print("data:", data)
844 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
845 data = r.json()
846 print("status for axfr-retrieve:", data)
847 self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
848 '\' from primary 127.0.0.2')
849
850 def test_notify_master_zone(self):
851 name, payload, data = self.create_zone(kind='Master')
852 print("payload:", payload)
853 print("data:", data)
854 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
855 data = r.json()
856 print("status for notify:", data)
857 self.assertEqual(data['result'], 'Notification queued')
858
859 def test_get_zone_with_symbols(self):
860 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
861 name = payload['name']
862 zone_id = (name.replace('/', '=2F'))
863 data = self.get_zone(zone_id)
864 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
865 self.assertIn(k, data)
866 if k in payload:
867 self.assertEqual(data[k], payload[k])
868
869 def test_get_zone(self):
870 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
871 domains = r.json()
872 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
873 data = self.get_zone(example_com['id'])
874 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
875 self.assertIn(k, data)
876 self.assertEqual(data['name'], 'example.com.')
877
878 def test_get_zone_rrset(self):
879 rz = self.session.get(self.url("/api/v1/servers/localhost/zones"))
880 domains = rz.json()
881 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
882
883 # verify single record from name that has a single record
884 data = self.get_zone(example_com['id'], rrset_name="host-18000.example.com.")
885 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
886 self.assertIn(k, data)
887 self.assertEqual(data['rrsets'],
888 [
889 {
890 'comments': [],
891 'name': 'host-18000.example.com.',
892 'records':
893 [
894 {
895 'content': '192.168.1.80',
896 'disabled': False
897 }
898 ],
899 'ttl': 120,
900 'type': 'A'
901 }
902 ]
903 )
904
905 # verify two RRsets from a name that has two types with one record each
906 powerdnssec_org = [domain for domain in domains if domain['name'] == u'powerdnssec.org.'][0]
907 data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.")
908 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
909 self.assertIn(k, data)
910 self.assertEqual(sorted(data['rrsets'], key=operator.itemgetter('type')),
911 [
912 {
913 'comments': [],
914 'name': 'localhost.powerdnssec.org.',
915 'records':
916 [
917 {
918 'content': '127.0.0.1',
919 'disabled': False
920 }
921 ],
922 'ttl': 3600,
923 'type': 'A'
924 },
925 {
926 'comments': [],
927 'name': 'localhost.powerdnssec.org.',
928 'records':
929 [
930 {
931 'content': '::1',
932 'disabled': False
933 }
934 ],
935 'ttl': 3600,
936 'type': 'AAAA'
937 },
938 ]
939 )
940
941 # verify one RRset with one record from a name that has two, then filtered by type
942 data = self.get_zone(powerdnssec_org['id'], rrset_name="localhost.powerdnssec.org.", rrset_type="AAAA")
943 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
944 self.assertIn(k, data)
945 self.assertEqual(data['rrsets'],
946 [
947 {
948 'comments': [],
949 'name': 'localhost.powerdnssec.org.',
950 'records':
951 [
952 {
953 'content': '::1',
954 'disabled': False
955 }
956 ],
957 'ttl': 3600,
958 'type': 'AAAA'
959 }
960 ]
961 )
962
963 def test_import_zone_broken(self):
964 payload = {
965 'name': 'powerdns-broken.com',
966 'kind': 'Master',
967 'nameservers': [],
968 }
969 payload['zone'] = """
970 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
971 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
972 ;; WARNING: recursion requested but not available
973
974 ;; OPT PSEUDOSECTION:
975 ; EDNS: version: 0, flags:; udp: 1680
976 ;; QUESTION SECTION:
977 ;powerdns.com. IN SOA
978
979 ;; ANSWER SECTION:
980 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
981 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
982 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
983 powerdns-broken.com. 86400 IN A 82.94.213.34
984 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
985 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
986 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
987 """
988 r = self.session.post(
989 self.url("/api/v1/servers/localhost/zones"),
990 data=json.dumps(payload),
991 headers={'content-type': 'application/json'})
992 self.assertEqual(r.status_code, 422)
993
994 def test_import_zone_axfr_outofzone(self):
995 # Ensure we don't create out-of-zone records
996 payload = {
997 'name': unique_zone_name(),
998 'kind': 'Master',
999 'nameservers': [],
1000 }
1001 payload['zone'] = """
1002 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1003 %NAME% 3600 IN NS powerdnssec2.ds9a.nl.
1004 example.org. 3600 IN AAAA 2001:888:2000:1d::2
1005 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1006 """.replace('%NAME%', payload['name'])
1007 r = self.session.post(
1008 self.url("/api/v1/servers/localhost/zones"),
1009 data=json.dumps(payload),
1010 headers={'content-type': 'application/json'})
1011 self.assertEqual(r.status_code, 422)
1012 self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
1013
1014 def test_import_zone_axfr(self):
1015 payload = {
1016 'name': 'powerdns.com.',
1017 'kind': 'Master',
1018 'nameservers': [],
1019 'soa_edit_api': '', # turn off so exact SOA comparison works.
1020 }
1021 payload['zone'] = """
1022 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
1023 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
1024 ;; WARNING: recursion requested but not available
1025
1026 ;; OPT PSEUDOSECTION:
1027 ; EDNS: version: 0, flags:; udp: 1680
1028 ;; QUESTION SECTION:
1029 ;powerdns.com. IN SOA
1030
1031 ;; ANSWER SECTION:
1032 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1033 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
1034 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
1035 powerdns.com. 86400 IN A 82.94.213.34
1036 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
1037 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
1038 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1039 """
1040 r = self.session.post(
1041 self.url("/api/v1/servers/localhost/zones"),
1042 data=json.dumps(payload),
1043 headers={'content-type': 'application/json'})
1044 self.assert_success_json(r)
1045 data = r.json()
1046 self.assertIn('name', data)
1047
1048 expected = {
1049 'NS': [
1050 {'content': 'powerdnssec1.ds9a.nl.'},
1051 {'content': 'powerdnssec2.ds9a.nl.'},
1052 ],
1053 'SOA': [
1054 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
1055 ],
1056 'MX': [
1057 {'content': '0 xs.powerdns.com.'},
1058 ],
1059 'A': [
1060 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
1061 ],
1062 'AAAA': [
1063 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
1064 ],
1065 }
1066
1067 eq_zone_rrsets(data['rrsets'], expected)
1068
1069 if not is_auth_lmdb():
1070 # check content in DB is stored WITHOUT trailing dot.
1071 dbrecs = get_db_records(payload['name'], 'NS')
1072 dbrec = next((dbrec for dbrec in dbrecs if dbrec['content'].startswith('powerdnssec1')))
1073 self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
1074
1075 def test_import_zone_bind(self):
1076 payload = {
1077 'name': 'example.org.',
1078 'kind': 'Master',
1079 'nameservers': [],
1080 'soa_edit_api': '', # turn off so exact SOA comparison works.
1081 }
1082 payload['zone'] = """
1083 $TTL 86400 ; 24 hours could have been written as 24h or 1d
1084 ; $TTL used for all RRs without explicit TTL value
1085 $ORIGIN example.org.
1086 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
1087 2002022401 ; serial
1088 3H ; refresh
1089 15 ; retry
1090 1w ; expire
1091 3h ; minimum
1092 )
1093 IN NS ns1.example.org. ; in the domain
1094 IN NS ns2.smokeyjoe.com. ; external to domain
1095 IN MX 10 mail.another.com. ; external mail provider
1096 ; server host definitions
1097 ns1 IN A 192.168.0.1 ;name server definition
1098 www IN A 192.168.0.2 ;web server definition
1099 ftp IN CNAME www.example.org. ;ftp server definition
1100 ; non server domain hosts
1101 bill IN A 192.168.0.3
1102 fred IN A 192.168.0.4
1103 """
1104 r = self.session.post(
1105 self.url("/api/v1/servers/localhost/zones"),
1106 data=json.dumps(payload),
1107 headers={'content-type': 'application/json'})
1108 self.assert_success_json(r)
1109 data = r.json()
1110 self.assertIn('name', data)
1111
1112 expected = {
1113 'NS': [
1114 {'content': 'ns1.example.org.'},
1115 {'content': 'ns2.smokeyjoe.com.'},
1116 ],
1117 'SOA': [
1118 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
1119 ],
1120 'MX': [
1121 {'content': '10 mail.another.com.'},
1122 ],
1123 'A': [
1124 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
1125 {'content': '192.168.0.2', 'name': 'www.example.org.'},
1126 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
1127 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
1128 ],
1129 'CNAME': [
1130 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
1131 ],
1132 }
1133
1134 eq_zone_rrsets(data['rrsets'], expected)
1135
1136 def test_import_zone_bind_cname_apex(self):
1137 payload = {
1138 'name': unique_zone_name(),
1139 'kind': 'Master',
1140 'nameservers': [],
1141 }
1142 payload['zone'] = """
1143 $ORIGIN %NAME%
1144 @ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
1145 @ IN NS ns1.example.org.
1146 @ IN NS ns2.smokeyjoe.com.
1147 @ IN CNAME www.example.org.
1148 """.replace('%NAME%', payload['name'])
1149 r = self.session.post(
1150 self.url("/api/v1/servers/localhost/zones"),
1151 data=json.dumps(payload),
1152 headers={'content-type': 'application/json'})
1153 self.assertEqual(r.status_code, 422)
1154 self.assertIn('Conflicts with another RRset', r.json()['error'])
1155
1156 def test_export_zone_json(self):
1157 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
1158 # export it
1159 r = self.session.get(
1160 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
1161 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
1162 )
1163 self.assert_success_json(r)
1164 data = r.json()
1165 self.assertIn('zone', data)
1166 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
1167 name + '\t3600\tIN\tNS\tns2.foo.com.',
1168 name + '\t3600\tIN\tSOA\ta.misconfigured.dns.server.invalid. hostmaster.' + name +
1169 ' 0 10800 3600 604800 3600']
1170 self.assertCountEqual(data['zone'].strip().split('\n'), expected_data)
1171
1172 def test_import_zone_consumer(self):
1173 zonestring = """
1174 $NAME$ 1D IN SOA ns1.example.org. hostmaster.example.org. (
1175 2002022401 ; serial
1176 3H ; refresh
1177 15 ; retry
1178 1w ; expire
1179 3h ; minimum
1180 )
1181 """
1182 self.create_zone(kind="Consumer", nameservers=[], zone=zonestring, expect_error="Zone data MUST NOT be given for Consumer zones")
1183
1184 def test_export_zone_text(self):
1185 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
1186 # export it
1187 r = self.session.get(
1188 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
1189 headers={'accept': '*/*'}
1190 )
1191 data = r.text.strip().split("\n")
1192 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
1193 name + '\t3600\tIN\tNS\tns2.foo.com.',
1194 name + '\t3600\tIN\tSOA\ta.misconfigured.dns.server.invalid. hostmaster.' + name +
1195 ' 0 10800 3600 604800 3600']
1196 self.assertCountEqual(data, expected_data)
1197
1198 def test_update_zone(self):
1199 name, payload, zone = self.create_zone()
1200 name = payload['name']
1201 # update, set as Master and enable SOA-EDIT-API
1202 payload = {
1203 'kind': 'Master',
1204 'masters': ['192.0.2.1', '192.0.2.2'],
1205 'catalog': 'catalog.invalid.',
1206 'soa_edit_api': 'EPOCH',
1207 'soa_edit': 'EPOCH'
1208 }
1209 self.put_zone(name, payload)
1210 data = self.get_zone(name)
1211 for k in payload.keys():
1212 self.assertIn(k, data)
1213 self.assertEqual(data[k], payload[k])
1214 # update, back to Native and empty(off)
1215 payload = {
1216 'kind': 'Native',
1217 'catalog': '',
1218 'soa_edit_api': '',
1219 'soa_edit': ''
1220 }
1221 self.put_zone(name, payload)
1222 data = self.get_zone(name)
1223 for k in payload.keys():
1224 self.assertIn(k, data)
1225 self.assertEqual(data[k], payload[k])
1226
1227 def test_zone_rr_update(self):
1228 name, payload, zone = self.create_zone()
1229 # do a replace (= update)
1230 rrset = {
1231 'changetype': 'replace',
1232 'name': name,
1233 'type': 'ns',
1234 'ttl': 3600,
1235 'records': [
1236 {
1237 "content": "ns1.bar.com.",
1238 "disabled": False
1239 },
1240 {
1241 "content": "ns2-disabled.bar.com.",
1242 "disabled": True
1243 }
1244 ]
1245 }
1246 payload = {'rrsets': [rrset]}
1247 r = self.session.patch(
1248 self.url("/api/v1/servers/localhost/zones/" + name),
1249 data=json.dumps(payload),
1250 headers={'content-type': 'application/json'})
1251 self.assert_success(r)
1252 # verify that (only) the new record is there
1253 data = self.get_zone(name)
1254 self.assertCountEqual(get_rrset(data, name, 'NS')['records'], rrset['records'])
1255
1256 def test_zone_rr_update_mx(self):
1257 # Important to test with MX records, as they have a priority field, which must end up in the content field.
1258 name, payload, zone = self.create_zone()
1259 # do a replace (= update)
1260 rrset = {
1261 'changetype': 'replace',
1262 'name': name,
1263 'type': 'MX',
1264 'ttl': 3600,
1265 'records': [
1266 {
1267 "content": "10 mail.example.org.",
1268 "disabled": False
1269 }
1270 ]
1271 }
1272 payload = {'rrsets': [rrset]}
1273 r = self.session.patch(
1274 self.url("/api/v1/servers/localhost/zones/" + name),
1275 data=json.dumps(payload),
1276 headers={'content-type': 'application/json'})
1277 self.assert_success(r)
1278 # verify that (only) the new record is there
1279 data = self.get_zone(name)
1280 self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset['records'])
1281
1282 def test_zone_rr_update_invalid_mx(self):
1283 name, payload, zone = self.create_zone()
1284 # do a replace (= update)
1285 rrset = {
1286 'changetype': 'replace',
1287 'name': name,
1288 'type': 'MX',
1289 'ttl': 3600,
1290 'records': [
1291 {
1292 "content": "10 mail@mx.example.org.",
1293 "disabled": False
1294 }
1295 ]
1296 }
1297 payload = {'rrsets': [rrset]}
1298 r = self.session.patch(
1299 self.url("/api/v1/servers/localhost/zones/" + name),
1300 data=json.dumps(payload),
1301 headers={'content-type': 'application/json'})
1302 self.assertEqual(r.status_code, 422)
1303 self.assertIn('non-hostname content', r.json()['error'])
1304 data = self.get_zone(name)
1305 self.assertIsNone(get_rrset(data, name, 'MX'))
1306
1307 def test_zone_rr_update_opt(self):
1308 name, payload, zone = self.create_zone()
1309 # do a replace (= update)
1310 rrset = {
1311 'changetype': 'replace',
1312 'name': name,
1313 'type': 'OPT',
1314 'ttl': 3600,
1315 'records': [
1316 {
1317 "content": "9",
1318 "disabled": False
1319 }
1320 ]
1321 }
1322 payload = {'rrsets': [rrset]}
1323 r = self.session.patch(
1324 self.url("/api/v1/servers/localhost/zones/" + name),
1325 data=json.dumps(payload),
1326 headers={'content-type': 'application/json'})
1327 self.assertEqual(r.status_code, 422)
1328 self.assertIn('OPT: invalid type given', r.json()['error'])
1329
1330 def test_zone_rr_update_multiple_rrsets(self):
1331 name, payload, zone = self.create_zone()
1332 rrset1 = {
1333 'changetype': 'replace',
1334 'name': name,
1335 'type': 'NS',
1336 'ttl': 3600,
1337 'records': [
1338 {
1339
1340 "content": "ns9999.example.com.",
1341 "disabled": False
1342 }
1343 ]
1344 }
1345 rrset2 = {
1346 'changetype': 'replace',
1347 'name': name,
1348 'type': 'MX',
1349 'ttl': 3600,
1350 'records': [
1351 {
1352 "content": "10 mx444.example.com.",
1353 "disabled": False
1354 }
1355 ]
1356 }
1357 payload = {'rrsets': [rrset1, rrset2]}
1358 r = self.session.patch(
1359 self.url("/api/v1/servers/localhost/zones/" + name),
1360 data=json.dumps(payload),
1361 headers={'content-type': 'application/json'})
1362 self.assert_success(r)
1363 # verify that all rrsets have been updated
1364 data = self.get_zone(name)
1365 self.assertEqual(get_rrset(data, name, 'NS')['records'], rrset1['records'])
1366 self.assertEqual(get_rrset(data, name, 'MX')['records'], rrset2['records'])
1367
1368 def test_zone_rr_update_duplicate_record(self):
1369 name, payload, zone = self.create_zone()
1370 rrset = {
1371 'changetype': 'replace',
1372 'name': name,
1373 'type': 'NS',
1374 'ttl': 3600,
1375 'records': [
1376 {"content": "ns9999.example.com.", "disabled": False},
1377 {"content": "ns9996.example.com.", "disabled": False},
1378 {"content": "ns9987.example.com.", "disabled": False},
1379 {"content": "ns9988.example.com.", "disabled": False},
1380 {"content": "ns9999.example.com.", "disabled": False},
1381 ]
1382 }
1383 payload = {'rrsets': [rrset]}
1384 r = self.session.patch(
1385 self.url("/api/v1/servers/localhost/zones/" + name),
1386 data=json.dumps(payload),
1387 headers={'content-type': 'application/json'})
1388 self.assertEqual(r.status_code, 422)
1389 self.assertIn('Duplicate record in RRset', r.json()['error'])
1390
1391 def test_zone_rr_update_duplicate_rrset(self):
1392 name, payload, zone = self.create_zone()
1393 rrset1 = {
1394 'changetype': 'replace',
1395 'name': name,
1396 'type': 'NS',
1397 'ttl': 3600,
1398 'records': [
1399 {
1400 "content": "ns9999.example.com.",
1401 "disabled": False
1402 }
1403 ]
1404 }
1405 rrset2 = {
1406 'changetype': 'replace',
1407 'name': name,
1408 'type': 'NS',
1409 'ttl': 3600,
1410 'records': [
1411 {
1412 "content": "ns9998.example.com.",
1413 "disabled": False
1414 }
1415 ]
1416 }
1417 payload = {'rrsets': [rrset1, rrset2]}
1418 r = self.session.patch(
1419 self.url("/api/v1/servers/localhost/zones/" + name),
1420 data=json.dumps(payload),
1421 headers={'content-type': 'application/json'})
1422 self.assertEqual(r.status_code, 422)
1423 self.assertIn('Duplicate RRset', r.json()['error'])
1424
1425 def test_zone_rr_delete(self):
1426 name, payload, zone = self.create_zone()
1427 # do a delete of all NS records (these are created with the zone)
1428 rrset = {
1429 'changetype': 'delete',
1430 'name': name,
1431 'type': 'NS'
1432 }
1433 payload = {'rrsets': [rrset]}
1434 r = self.session.patch(
1435 self.url("/api/v1/servers/localhost/zones/" + name),
1436 data=json.dumps(payload),
1437 headers={'content-type': 'application/json'})
1438 self.assert_success(r)
1439 # verify that the records are gone
1440 data = self.get_zone(name)
1441 self.assertIsNone(get_rrset(data, name, 'NS'))
1442
1443 def test_zone_rr_update_rrset_combine_replace_and_delete(self):
1444 name, payload, zone = self.create_zone()
1445 rrset1 = {
1446 'changetype': 'delete',
1447 'name': 'sub.' + name,
1448 'type': 'CNAME',
1449 }
1450 rrset2 = {
1451 'changetype': 'replace',
1452 'name': 'sub.' + name,
1453 'type': 'CNAME',
1454 'ttl': 500,
1455 'records': [
1456 {
1457 "content": "www.example.org.",
1458 "disabled": False
1459 }
1460 ]
1461 }
1462 payload = {'rrsets': [rrset1, rrset2]}
1463 r = self.session.patch(
1464 self.url("/api/v1/servers/localhost/zones/" + name),
1465 data=json.dumps(payload),
1466 headers={'content-type': 'application/json'})
1467 self.assert_success(r)
1468 # verify that (only) the new record is there
1469 data = self.get_zone(name)
1470 self.assertEqual(get_rrset(data, 'sub.' + name, 'CNAME')['records'], rrset2['records'])
1471
1472 def test_zone_disable_reenable(self):
1473 # This also tests that SOA-EDIT-API works.
1474 name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
1475 # disable zone by disabling SOA
1476 rrset = {
1477 'changetype': 'replace',
1478 'name': name,
1479 'type': 'SOA',
1480 'ttl': 3600,
1481 'records': [
1482 {
1483 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1484 "disabled": True
1485 }
1486 ]
1487 }
1488 payload = {'rrsets': [rrset]}
1489 r = self.session.patch(
1490 self.url("/api/v1/servers/localhost/zones/" + name),
1491 data=json.dumps(payload),
1492 headers={'content-type': 'application/json'})
1493 self.assert_success(r)
1494 # check SOA serial has been edited
1495 data = self.get_zone(name)
1496 soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1497 self.assertNotEqual(soa_serial1, '1')
1498 # make sure domain is still in zone list (disabled SOA!)
1499 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
1500 domains = r.json()
1501 self.assertEqual(len([domain for domain in domains if domain['name'] == name]), 1)
1502 # sleep 1sec to ensure the EPOCH value changes for the next request
1503 time.sleep(1)
1504 # verify that modifying it still works
1505 rrset['records'][0]['disabled'] = False
1506 payload = {'rrsets': [rrset]}
1507 r = self.session.patch(
1508 self.url("/api/v1/servers/localhost/zones/" + name),
1509 data=json.dumps(payload),
1510 headers={'content-type': 'application/json'})
1511 self.assert_success(r)
1512 # check SOA serial has been edited again
1513 data = self.get_zone(name)
1514 soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1515 self.assertNotEqual(soa_serial2, '1')
1516 self.assertNotEqual(soa_serial2, soa_serial1)
1517
1518 def test_zone_rr_update_out_of_zone(self):
1519 name, payload, zone = self.create_zone()
1520 # replace with qname mismatch
1521 rrset = {
1522 'changetype': 'replace',
1523 'name': 'not-in-zone.',
1524 'type': 'NS',
1525 'ttl': 3600,
1526 'records': [
1527 {
1528 "content": "ns1.bar.com.",
1529 "disabled": False
1530 }
1531 ]
1532 }
1533 payload = {'rrsets': [rrset]}
1534 r = self.session.patch(
1535 self.url("/api/v1/servers/localhost/zones/" + name),
1536 data=json.dumps(payload),
1537 headers={'content-type': 'application/json'})
1538 self.assertEqual(r.status_code, 422)
1539 self.assertIn('out of zone', r.json()['error'])
1540
1541 def test_zone_rr_update_restricted_chars(self):
1542 name, payload, zone = self.create_zone()
1543 # replace with qname mismatch
1544 rrset = {
1545 'changetype': 'replace',
1546 'name': 'test:' + name,
1547 'type': 'NS',
1548 'ttl': 3600,
1549 'records': [
1550 {
1551 "content": "ns1.bar.com.",
1552 "disabled": False
1553 }
1554 ]
1555 }
1556 payload = {'rrsets': [rrset]}
1557 r = self.session.patch(
1558 self.url("/api/v1/servers/localhost/zones/" + name),
1559 data=json.dumps(payload),
1560 headers={'content-type': 'application/json'})
1561 self.assertEqual(r.status_code, 422)
1562 self.assertIn('contains unsupported characters', r.json()['error'])
1563
1564 def test_rrset_unknown_type(self):
1565 name, payload, zone = self.create_zone()
1566 rrset = {
1567 'changetype': 'replace',
1568 'name': name,
1569 'type': 'FAFAFA',
1570 'ttl': 3600,
1571 'records': [
1572 {
1573 "content": "4.3.2.1",
1574 "disabled": False
1575 }
1576 ]
1577 }
1578 payload = {'rrsets': [rrset]}
1579 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1580 headers={'content-type': 'application/json'})
1581 self.assertEqual(r.status_code, 422)
1582 self.assertIn('unknown type', r.json()['error'])
1583
1584 @parameterized.expand([
1585 ('CNAME', ),
1586 ])
1587 def test_rrset_exclusive_and_other(self, qtype):
1588 name, payload, zone = self.create_zone()
1589 rrset = {
1590 'changetype': 'replace',
1591 'name': name,
1592 'type': qtype,
1593 'ttl': 3600,
1594 'records': [
1595 {
1596 "content": "example.org.",
1597 "disabled": False
1598 }
1599 ]
1600 }
1601 payload = {'rrsets': [rrset]}
1602 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1603 headers={'content-type': 'application/json'})
1604 self.assertEqual(r.status_code, 422)
1605 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1606
1607 @parameterized.expand([
1608 ('CNAME', ),
1609 ])
1610 def test_rrset_other_and_exclusive(self, qtype):
1611 name, payload, zone = self.create_zone()
1612 rrset = {
1613 'changetype': 'replace',
1614 'name': 'sub.'+name,
1615 'type': qtype,
1616 'ttl': 3600,
1617 'records': [
1618 {
1619 "content": "example.org.",
1620 "disabled": False
1621 }
1622 ]
1623 }
1624 payload = {'rrsets': [rrset]}
1625 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1626 headers={'content-type': 'application/json'})
1627 self.assert_success(r)
1628 rrset = {
1629 'changetype': 'replace',
1630 'name': 'sub.'+name,
1631 'type': 'A',
1632 'ttl': 3600,
1633 'records': [
1634 {
1635 "content": "1.2.3.4",
1636 "disabled": False
1637 }
1638 ]
1639 }
1640 payload = {'rrsets': [rrset]}
1641 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1642 headers={'content-type': 'application/json'})
1643 self.assertEqual(r.status_code, 422)
1644 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1645
1646 @parameterized.expand([
1647 ('', 'SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
1648 ('sub.', 'CNAME', ['01.example.org.', '02.example.org.']),
1649 ])
1650 def test_rrset_single_qtypes(self, label, qtype, contents):
1651 name, payload, zone = self.create_zone()
1652 rrset = {
1653 'changetype': 'replace',
1654 'name': label + name,
1655 'type': qtype,
1656 'ttl': 3600,
1657 'records': [
1658 {
1659 "content": contents[0],
1660 "disabled": False
1661 },
1662 {
1663 "content": contents[1],
1664 "disabled": False
1665 }
1666 ]
1667 }
1668 payload = {'rrsets': [rrset]}
1669 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1670 headers={'content-type': 'application/json'})
1671 self.assertEqual(r.status_code, 422)
1672 self.assertIn('IN ' + qtype + ' has more than one record', r.json()['error'])
1673
1674 def test_rrset_zone_apex(self):
1675 name, payload, zone = self.create_zone()
1676 rrset1 = {
1677 'changetype': 'replace',
1678 'name': name,
1679 'type': 'SOA',
1680 'ttl': 3600,
1681 'records': [
1682 {
1683 "content": 'ns1.example.org. test@example.org. 10 10800 3600 604800 3600',
1684 "disabled": False
1685 },
1686 ]
1687 }
1688 rrset2 = {
1689 'changetype': 'replace',
1690 'name': name,
1691 'type': 'DNAME',
1692 'ttl': 3600,
1693 'records': [
1694 {
1695 "content": 'example.com.',
1696 "disabled": False
1697 },
1698 ]
1699 }
1700
1701 payload = {'rrsets': [rrset1, rrset2]}
1702 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1703 headers={'content-type': 'application/json'})
1704 self.assert_success(r) # user should be able to create DNAME at APEX as per RFC 6672 section 2.3
1705
1706 @parameterized.expand([
1707 ('SOA', 'ns1.example.org. test@example.org. 10 10800 3600 604800 1800'),
1708 ('DNSKEY', '257 3 8 AwEAAb/+pXOZWYQ8mv9WM5dFva8WU9jcIUdDuEjldbyfnkQ/xlrJC5zAEfhYhrea3SmIPmMTDimLqbh3/4SMTNPTUF+9+U1vpNfIRTFadqsmuU9Fddz3JqCcYwEpWbReg6DJOeyu+9oBoIQkPxFyLtIXEPGlQzrynKubn04Cx83I6NfzDTraJT3jLHKeW5PVc1ifqKzHz5TXdHHTA7NkJAa0sPcZCoNE1LpnJI/wcUpRUiuQhoLFeT1E432GuPuZ7y+agElGj0NnBxEgnHrhrnZWUbULpRa/il+Cr5Taj988HqX9Xdm6FjcP4Lbuds/44U7U8du224Q8jTrZ57Yvj4VDQKc='),
1709 ])
1710 def test_only_at_apex(self, qtype, content):
1711 name, payload, zone = self.create_zone(soa_edit_api='')
1712 rrset = {
1713 'changetype': 'replace',
1714 'name': name,
1715 'type': qtype,
1716 'ttl': 3600,
1717 'records': [
1718 {
1719 "content": content,
1720 "disabled": False
1721 },
1722 ]
1723 }
1724 payload = {'rrsets': [rrset]}
1725 r = self.session.patch(
1726 self.url("/api/v1/servers/localhost/zones/" + name),
1727 data=json.dumps(payload),
1728 headers={'content-type': 'application/json'})
1729 self.assert_success(r)
1730 # verify that the new record is there
1731 data = self.get_zone(name)
1732 self.assertEqual(get_rrset(data, name, qtype)['records'], rrset['records'])
1733
1734 rrset = {
1735 'changetype': 'replace',
1736 'name': 'sub.' + name,
1737 'type': qtype,
1738 'ttl': 3600,
1739 'records': [
1740 {
1741 "content": content,
1742 "disabled": False
1743 },
1744 ]
1745 }
1746 payload = {'rrsets': [rrset]}
1747 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1748 headers={'content-type': 'application/json'})
1749 self.assertEqual(r.status_code, 422)
1750 self.assertIn('only allowed at apex', r.json()['error'])
1751 data = self.get_zone(name)
1752 self.assertIsNone(get_rrset(data, 'sub.' + name, qtype))
1753
1754 @parameterized.expand([
1755 ('DS', '44030 8 2 d4c3d5552b8679faeebc317e5f048b614b2e5f607dc57f1553182d49ab2179f7'),
1756 ])
1757 def test_not_allowed_at_apex(self, qtype, content):
1758 name, payload, zone = self.create_zone()
1759 rrset = {
1760 'changetype': 'replace',
1761 'name': name,
1762 'type': qtype,
1763 'ttl': 3600,
1764 'records': [
1765 {
1766 "content": content,
1767 "disabled": False
1768 },
1769 ]
1770 }
1771 payload = {'rrsets': [rrset]}
1772 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1773 headers={'content-type': 'application/json'})
1774 self.assertEqual(r.status_code, 422)
1775 self.assertIn('not allowed at apex', r.json()['error'])
1776 data = self.get_zone(name)
1777 self.assertIsNone(get_rrset(data, 'sub.' + name, qtype))
1778
1779 rrset = {
1780 'changetype': 'replace',
1781 'name': 'sub.' + name,
1782 'type': qtype,
1783 'ttl': 3600,
1784 'records': [
1785 {
1786 "content": content,
1787 "disabled": False
1788 },
1789 ]
1790 }
1791 payload = {'rrsets': [rrset]}
1792 r = self.session.patch(
1793 self.url("/api/v1/servers/localhost/zones/" + name),
1794 data=json.dumps(payload),
1795 headers={'content-type': 'application/json'})
1796 self.assert_success(r)
1797 # verify that the new record is there
1798 data = self.get_zone(name)
1799 self.assertEqual(get_rrset(data, 'sub.' + name, qtype)['records'], rrset['records'])
1800
1801 def test_rr_svcb(self):
1802 name, payload, zone = self.create_zone()
1803 rrset = {
1804 'changetype': 'replace',
1805 'name': 'svcb.' + name,
1806 'type': 'SVCB',
1807 'ttl': 3600,
1808 'records': [
1809 {
1810 "content": '40 . mandatory=alpn alpn=h2,h3 ipv4hint=192.0.2.1,192.0.2.2 ech="dG90YWxseSBib2d1cyBlY2hjb25maWcgdmFsdWU="',
1811 "disabled": False
1812 },
1813 ]
1814 }
1815 payload = {'rrsets': [rrset]}
1816 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1817 headers={'content-type': 'application/json'})
1818 self.assert_success(r)
1819
1820 def test_rrset_ns_dname_exclude(self):
1821 name, payload, zone = self.create_zone()
1822 rrset = {
1823 'changetype': 'replace',
1824 'name': 'delegation.'+name,
1825 'type': 'NS',
1826 'ttl': 3600,
1827 'records': [
1828 {
1829 "content": "ns.example.org.",
1830 "disabled": False
1831 }
1832 ]
1833 }
1834 payload = {'rrsets': [rrset]}
1835 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1836 headers={'content-type': 'application/json'})
1837 self.assert_success(r)
1838 rrset = {
1839 'changetype': 'replace',
1840 'name': 'delegation.'+name,
1841 'type': 'DNAME',
1842 'ttl': 3600,
1843 'records': [
1844 {
1845 "content": "example.com.",
1846 "disabled": False
1847 }
1848 ]
1849 }
1850 payload = {'rrsets': [rrset]}
1851 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1852 headers={'content-type': 'application/json'})
1853 self.assertEqual(r.status_code, 422)
1854 self.assertIn('Cannot have both NS and DNAME except in zone apex', r.json()['error'])
1855
1856 ## FIXME: Enable this when it's time for it
1857 # def test_rrset_dname_nothing_under(self):
1858 # name, payload, zone = self.create_zone()
1859 # rrset = {
1860 # 'changetype': 'replace',
1861 # 'name': 'delegation.'+name,
1862 # 'type': 'DNAME',
1863 # 'ttl': 3600,
1864 # 'records': [
1865 # {
1866 # "content": "example.com.",
1867 # "disabled": False
1868 # }
1869 # ]
1870 # }
1871 # payload = {'rrsets': [rrset]}
1872 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1873 # headers={'content-type': 'application/json'})
1874 # self.assert_success(r)
1875 # rrset = {
1876 # 'changetype': 'replace',
1877 # 'name': 'sub.delegation.'+name,
1878 # 'type': 'A',
1879 # 'ttl': 3600,
1880 # 'records': [
1881 # {
1882 # "content": "1.2.3.4",
1883 # "disabled": False
1884 # }
1885 # ]
1886 # }
1887 # payload = {'rrsets': [rrset]}
1888 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1889 # headers={'content-type': 'application/json'})
1890 # self.assertEqual(r.status_code, 422)
1891 # self.assertIn('You cannot have record(s) under CNAME/DNAME', r.json()['error'])
1892
1893 def test_create_zone_with_leading_space(self):
1894 # Actual regression.
1895 name, payload, zone = self.create_zone()
1896 rrset = {
1897 'changetype': 'replace',
1898 'name': name,
1899 'type': 'A',
1900 'ttl': 3600,
1901 'records': [
1902 {
1903 "content": " 4.3.2.1",
1904 "disabled": False
1905 }
1906 ]
1907 }
1908 payload = {'rrsets': [rrset]}
1909 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1910 headers={'content-type': 'application/json'})
1911 self.assertEqual(r.status_code, 422)
1912 self.assertIn('Not in expected format', r.json()['error'])
1913
1914 @unittest.skipIf(is_auth_lmdb(), "No out-of-zone storage in LMDB")
1915 def test_zone_rr_delete_out_of_zone(self):
1916 name, payload, zone = self.create_zone()
1917 rrset = {
1918 'changetype': 'delete',
1919 'name': 'not-in-zone.',
1920 'type': 'NS'
1921 }
1922 payload = {'rrsets': [rrset]}
1923 r = self.session.patch(
1924 self.url("/api/v1/servers/localhost/zones/" + name),
1925 data=json.dumps(payload),
1926 headers={'content-type': 'application/json'})
1927 print(r.content)
1928 self.assert_success(r) # succeed so users can fix their wrong, old data
1929
1930 def test_zone_delete(self):
1931 name, payload, zone = self.create_zone()
1932 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1933 self.assertEqual(r.status_code, 204)
1934 self.assertNotIn('Content-Type', r.headers)
1935
1936 def test_zone_comment_create(self):
1937 name, payload, zone = self.create_zone()
1938 rrset = {
1939 'changetype': 'replace',
1940 'name': name,
1941 'type': 'NS',
1942 'ttl': 3600,
1943 'comments': [
1944 {
1945 'account': 'test1',
1946 'content': 'blah blah',
1947 },
1948 {
1949 'account': 'test2',
1950 'content': 'blah blah bleh',
1951 }
1952 ]
1953 }
1954 payload = {'rrsets': [rrset]}
1955 r = self.session.patch(
1956 self.url("/api/v1/servers/localhost/zones/" + name),
1957 data=json.dumps(payload),
1958 headers={'content-type': 'application/json'})
1959 if is_auth_lmdb():
1960 self.assert_error_json(r) # No comments in LMDB
1961 return
1962 else:
1963 self.assert_success(r)
1964 # make sure the comments have been set, and that the NS
1965 # records are still present
1966 data = self.get_zone(name)
1967 serverset = get_rrset(data, name, 'NS')
1968 print(serverset)
1969 self.assertNotEqual(serverset['records'], [])
1970 self.assertNotEqual(serverset['comments'], [])
1971 # verify that modified_at has been set by pdns
1972 self.assertNotEqual([c for c in serverset['comments']][0]['modified_at'], 0)
1973 # verify that TTL is correct (regression test)
1974 self.assertEqual(serverset['ttl'], 3600)
1975
1976 def test_zone_comment_delete(self):
1977 # Test: Delete ONLY comments.
1978 name, payload, zone = self.create_zone()
1979 rrset = {
1980 'changetype': 'replace',
1981 'name': name,
1982 'type': 'NS',
1983 'comments': []
1984 }
1985 payload = {'rrsets': [rrset]}
1986 r = self.session.patch(
1987 self.url("/api/v1/servers/localhost/zones/" + name),
1988 data=json.dumps(payload),
1989 headers={'content-type': 'application/json'})
1990 self.assert_success(r)
1991 # make sure the NS records are still present
1992 data = self.get_zone(name)
1993 serverset = get_rrset(data, name, 'NS')
1994 print(serverset)
1995 self.assertNotEqual(serverset['records'], [])
1996 self.assertEqual(serverset['comments'], [])
1997
1998 @unittest.skipIf(is_auth_lmdb(), "No comments in LMDB")
1999 def test_zone_comment_out_of_range_modified_at(self):
2000 # Test if comments on an rrset stay intact if the rrset is replaced
2001 name, payload, zone = self.create_zone()
2002 rrset = {
2003 'changetype': 'replace',
2004 'name': name,
2005 'type': 'NS',
2006 'comments': [
2007 {
2008 'account': 'test1',
2009 'content': 'oh hi there',
2010 'modified_at': '4294967297'
2011 }
2012 ]
2013 }
2014 payload = {'rrsets': [rrset]}
2015 r = self.session.patch(
2016 self.url("/api/v1/servers/localhost/zones/" + name),
2017 data=json.dumps(payload),
2018 headers={'content-type': 'application/json'})
2019 self.assertEqual(r.status_code, 422)
2020 self.assertIn("Key 'modified_at' is out of range", r.json()['error'])
2021
2022 @unittest.skipIf(is_auth_lmdb(), "No comments in LMDB")
2023 def test_zone_comment_stay_intact(self):
2024 # Test if comments on an rrset stay intact if the rrset is replaced
2025 name, payload, zone = self.create_zone()
2026 # create a comment
2027 rrset = {
2028 'changetype': 'replace',
2029 'name': name,
2030 'type': 'NS',
2031 'comments': [
2032 {
2033 'account': 'test1',
2034 'content': 'oh hi there',
2035 'modified_at': 1111
2036 }
2037 ]
2038 }
2039 payload = {'rrsets': [rrset]}
2040 r = self.session.patch(
2041 self.url("/api/v1/servers/localhost/zones/" + name),
2042 data=json.dumps(payload),
2043 headers={'content-type': 'application/json'})
2044 self.assert_success(r)
2045 # replace rrset records
2046 rrset2 = {
2047 'changetype': 'replace',
2048 'name': name,
2049 'type': 'NS',
2050 'ttl': 3600,
2051 'records': [
2052 {
2053 "content": "ns1.bar.com.",
2054 "disabled": False
2055 }
2056 ]
2057 }
2058 payload2 = {'rrsets': [rrset2]}
2059 r = self.session.patch(
2060 self.url("/api/v1/servers/localhost/zones/" + name),
2061 data=json.dumps(payload2),
2062 headers={'content-type': 'application/json'})
2063 self.assert_success(r)
2064 # make sure the comments still exist
2065 data = self.get_zone(name)
2066 serverset = get_rrset(data, name, 'NS')
2067 print(serverset)
2068 self.assertEqual(serverset['records'], rrset2['records'])
2069 self.assertEqual(serverset['comments'], rrset['comments'])
2070
2071 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2072 def test_search_rr_exact_zone(self):
2073 name = unique_zone_name()
2074 self.create_zone(name=name, serial=22, soa_edit_api='')
2075 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
2076 self.assert_success_json(r)
2077 print(r.json())
2078 self.assertCountEqual(r.json(), [
2079 {u'object_type': u'zone', u'name': name, u'zone_id': name},
2080 {u'content': u'ns1.example.com.',
2081 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2082 u'ttl': 3600, u'type': u'NS', u'name': name},
2083 {u'content': u'ns2.example.com.',
2084 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2085 u'ttl': 3600, u'type': u'NS', u'name': name},
2086 {u'content': u'a.misconfigured.dns.server.invalid. hostmaster.'+name+' 22 10800 3600 604800 3600',
2087 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2088 u'ttl': 3600, u'type': u'SOA', u'name': name},
2089 ])
2090
2091 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2092 def test_search_rr_exact_zone_filter_type_zone(self):
2093 name = unique_zone_name()
2094 data_type = "zone"
2095 self.create_zone(name=name, serial=22, soa_edit_api='')
2096 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
2097 self.assert_success_json(r)
2098 print(r.json())
2099 self.assertEqual(r.json(), [
2100 {u'object_type': u'zone', u'name': name, u'zone_id': name},
2101 ])
2102
2103 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2104 def test_search_rr_exact_zone_filter_type_record(self):
2105 name = unique_zone_name()
2106 data_type = "record"
2107 self.create_zone(name=name, serial=22, soa_edit_api='')
2108 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
2109 self.assert_success_json(r)
2110 print(r.json())
2111 self.assertCountEqual(r.json(), [
2112 {u'content': u'ns1.example.com.',
2113 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2114 u'ttl': 3600, u'type': u'NS', u'name': name},
2115 {u'content': u'ns2.example.com.',
2116 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2117 u'ttl': 3600, u'type': u'NS', u'name': name},
2118 {u'content': u'a.misconfigured.dns.server.invalid. hostmaster.'+name+' 22 10800 3600 604800 3600',
2119 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
2120 u'ttl': 3600, u'type': u'SOA', u'name': name},
2121 ])
2122
2123 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2124 def test_search_rr_substring(self):
2125 name = unique_zone_name()
2126 search = name[5:-5]
2127 self.create_zone(name=name)
2128 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
2129 self.assert_success_json(r)
2130 print(r.json())
2131 # should return zone, SOA, ns1, ns2
2132 self.assertEqual(len(r.json()), 4)
2133
2134 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2135 def test_search_rr_case_insensitive(self):
2136 name = unique_zone_name()+'testsuffix.'
2137 self.create_zone(name=name)
2138 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
2139 self.assert_success_json(r)
2140 print(r.json())
2141 # should return zone, SOA, ns1, ns2
2142 self.assertEqual(len(r.json()), 4)
2143
2144 @unittest.skipIf(is_auth_lmdb(), "No search or comments in LMDB")
2145 def test_search_rr_comment(self):
2146 name = unique_zone_name()
2147 rrsets = [{
2148 "name": name,
2149 "type": "AAAA",
2150 "ttl": 3600,
2151 "records": [{
2152 "content": "2001:DB8::1",
2153 "disabled": False,
2154 }],
2155 "comments": [{
2156 "account": "test AAAA",
2157 "content": "blah",
2158 "modified_at": 11112,
2159 }],
2160 }]
2161 name, payload, data = self.create_zone(name=name, rrsets=rrsets)
2162 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=blah"))
2163 self.assert_success_json(r)
2164 data = r.json()
2165 # should return the AAAA record
2166 self.assertEqual(len(data), 1)
2167 self.assertEqual(data[0]['object_type'], 'comment')
2168 self.assertEqual(data[0]['type'], 'AAAA')
2169 self.assertEqual(data[0]['name'], name)
2170 self.assertEqual(data[0]['content'], rrsets[0]['comments'][0]['content'])
2171
2172 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2173 def test_search_after_rectify_with_ent(self):
2174 name = unique_zone_name()
2175 search = name.split('.')[0]
2176 rrset = {
2177 "name": 'sub.sub.' + name,
2178 "type": "A",
2179 "ttl": 3600,
2180 "records": [{
2181 "content": "4.3.2.1",
2182 "disabled": False,
2183 }],
2184 }
2185 self.create_zone(name=name, rrsets=[rrset])
2186 pdnsutil_rectify(name)
2187 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
2188 self.assert_success_json(r)
2189 print(r.json())
2190 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
2191 self.assertEqual(len(r.json()), 5)
2192
2193 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2194 def test_default_api_rectify_dnssec(self):
2195 name = unique_zone_name()
2196 rrsets = [
2197 {
2198 "name": 'a.' + name,
2199 "type": "AAAA",
2200 "ttl": 3600,
2201 "records": [{
2202 "content": "2001:DB8::1",
2203 "disabled": False,
2204 }],
2205 },
2206 {
2207 "name": 'b.' + name,
2208 "type": "AAAA",
2209 "ttl": 3600,
2210 "records": [{
2211 "content": "2001:DB8::2",
2212 "disabled": False,
2213 }],
2214 },
2215 ]
2216 self.create_zone(name=name, rrsets=rrsets, dnssec=True, nsec3param='1 0 1 ab')
2217 dbrecs = get_db_records(name, 'AAAA')
2218 self.assertIsNotNone(dbrecs[0]['ordername'])
2219
2220 def test_default_api_rectify_nodnssec(self):
2221 """Without any DNSSEC settings, rectify should still add ENTs. Setup the zone
2222 so ENTs are necessary, and check for their existence using sdig.
2223 """
2224 name = unique_zone_name()
2225 rrsets = [
2226 {
2227 "name": 'a.sub.' + name,
2228 "type": "AAAA",
2229 "ttl": 3600,
2230 "records": [{
2231 "content": "2001:DB8::1",
2232 "disabled": False,
2233 }],
2234 },
2235 {
2236 "name": 'b.sub.' + name,
2237 "type": "AAAA",
2238 "ttl": 3600,
2239 "records": [{
2240 "content": "2001:DB8::2",
2241 "disabled": False,
2242 }],
2243 },
2244 ]
2245 self.create_zone(name=name, rrsets=rrsets)
2246 # default-api-rectify is yes (by default). expect rectify to have happened.
2247 assert 'Rcode: 0 ' in sdig('sub.' + name, 'TXT')
2248
2249 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2250 def test_override_api_rectify(self):
2251 name = unique_zone_name()
2252 search = name.split('.')[0]
2253 rrsets = [
2254 {
2255 "name": 'a.' + name,
2256 "type": "AAAA",
2257 "ttl": 3600,
2258 "records": [{
2259 "content": "2001:DB8::1",
2260 "disabled": False,
2261 }],
2262 },
2263 {
2264 "name": 'b.' + name,
2265 "type": "AAAA",
2266 "ttl": 3600,
2267 "records": [{
2268 "content": "2001:DB8::2",
2269 "disabled": False,
2270 }],
2271 },
2272 ]
2273 self.create_zone(name=name, rrsets=rrsets, api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
2274 dbrecs = get_db_records(name, 'AAAA')
2275 self.assertIsNone(dbrecs[0]['ordername'])
2276
2277 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2278 def test_explicit_rectify_success(self):
2279 name, _, data = self.create_zone = self.create_zone(api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
2280 dbrecs = get_db_records(name, 'SOA')
2281 self.assertIsNone(dbrecs[0]['ordername'])
2282 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/rectify"))
2283 self.assertEqual(r.status_code, 200)
2284 dbrecs = get_db_records(name, 'SOA')
2285 self.assertIsNotNone(dbrecs[0]['ordername'])
2286
2287 def test_explicit_rectify_slave(self):
2288 # Some users want to move a zone to kind=Slave and then rectify, without a re-transfer.
2289 name, _, data = self.create_zone = self.create_zone(api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
2290 self.put_zone(data['id'], {'kind': 'Slave'})
2291 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/rectify"))
2292 self.assertEqual(r.status_code, 200)
2293 if not is_auth_lmdb():
2294 dbrecs = get_db_records(name, 'SOA')
2295 self.assertIsNotNone(dbrecs[0]['ordername'])
2296
2297 def test_cname_at_ent_place(self):
2298 name, payload, zone = self.create_zone(dnssec=True, api_rectify=True)
2299 rrset = {
2300 'changetype': 'replace',
2301 'name': 'sub2.sub1.' + name,
2302 'type': "A",
2303 'ttl': 3600,
2304 'records': [{
2305 'content': "4.3.2.1",
2306 'disabled': False,
2307 }],
2308 }
2309 payload = {'rrsets': [rrset]}
2310 r = self.session.patch(
2311 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
2312 data=json.dumps(payload),
2313 headers={'content-type': 'application/json'})
2314 self.assertEqual(r.status_code, 204)
2315 rrset = {
2316 'changetype': 'replace',
2317 'name': 'sub1.' + name,
2318 'type': "CNAME",
2319 'ttl': 3600,
2320 'records': [{
2321 'content': "www.example.org.",
2322 'disabled': False,
2323 }],
2324 }
2325 payload = {'rrsets': [rrset]}
2326 r = self.session.patch(
2327 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
2328 data=json.dumps(payload),
2329 headers={'content-type': 'application/json'})
2330 self.assertEqual(r.status_code, 204)
2331
2332 def test_rrset_parameter_post_false(self):
2333 name = unique_zone_name()
2334 payload = {
2335 'name': name,
2336 'kind': 'Native',
2337 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
2338 }
2339 r = self.session.post(
2340 self.url("/api/v1/servers/localhost/zones?rrsets=false"),
2341 data=json.dumps(payload),
2342 headers={'content-type': 'application/json'})
2343 print(r.json())
2344 self.assert_success_json(r)
2345 self.assertEqual(r.status_code, 201)
2346 self.assertEqual(r.json().get('rrsets'), None)
2347
2348 def test_rrset_false_parameter(self):
2349 name = unique_zone_name()
2350 self.create_zone(name=name, kind='Native')
2351 data = self.get_zone(name, rrsets="false")
2352 self.assertEqual(data.get('rrsets'), None)
2353
2354 def test_rrset_true_parameter(self):
2355 name = unique_zone_name()
2356 self.create_zone(name=name, kind='Native')
2357 data = self.get_zone(name, rrsets="true")
2358 self.assertEqual(len(data['rrsets']), 2)
2359
2360 def test_wrong_rrset_parameter(self):
2361 name = unique_zone_name()
2362 self.create_zone(name=name, kind='Native')
2363 self.get_zone(
2364 name, rrsets="foobar",
2365 expect_error="'rrsets' request parameter value 'foobar' is not supported"
2366 )
2367
2368 def test_put_master_tsig_key_ids_non_existent(self):
2369 name = unique_zone_name()
2370 keyname = unique_zone_name().split('.')[0]
2371 self.create_zone(name=name, kind='Native')
2372 payload = {
2373 'master_tsig_key_ids': [keyname]
2374 }
2375 self.put_zone(name, payload, expect_error='A TSIG key with the name')
2376
2377 def test_put_slave_tsig_key_ids_non_existent(self):
2378 name = unique_zone_name()
2379 keyname = unique_zone_name().split('.')[0]
2380 self.create_zone(name=name, kind='Native')
2381 payload = {
2382 'slave_tsig_key_ids': [keyname]
2383 }
2384 self.put_zone(name, payload, expect_error='A TSIG key with the name')
2385
2386 def test_zone_replace_rrsets_basic(self):
2387 """Basic test: all automatic modification is off, on replace the new rrsets are ingested as is."""
2388 name, _, _ = self.create_zone(dnssec=False, soa_edit='', soa_edit_api='')
2389 rrsets = [
2390 {'name': name, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2391 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2392 {'name': 'www.' + name, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2393 {'name': 'sub.' + name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}]},
2394 ]
2395 self.put_zone(name, {'rrsets': rrsets})
2396
2397 data = self.get_zone(name)
2398 for rrset in rrsets:
2399 rrset.setdefault('comments', [])
2400 for record in rrset['records']:
2401 record.setdefault('disabled', False)
2402 assert_eq_rrsets(data['rrsets'], rrsets)
2403
2404 def test_zone_replace_rrsets_dnssec(self):
2405 """With dnssec: check automatic rectify is done"""
2406 name, _, _ = self.create_zone(dnssec=True)
2407 rrsets = [
2408 {'name': name, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2409 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2410 {'name': 'www.' + name, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2411 ]
2412 self.put_zone(name, {'rrsets': rrsets})
2413
2414 if not is_auth_lmdb():
2415 # lmdb: skip, no get_db_records implementations
2416 dbrecs = get_db_records(name, 'A')
2417 assert dbrecs[0]['ordername'] is not None # default = rectify enabled
2418
2419 def test_zone_replace_rrsets_with_soa_edit(self):
2420 """SOA-EDIT was enabled before rrsets will be replaced"""
2421 name, _, _ = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
2422 rrsets = [
2423 {'name': name, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2424 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2425 {'name': 'www.' + name, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2426 {'name': 'sub.' + name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}]},
2427 ]
2428 self.put_zone(name, {'rrsets': rrsets})
2429
2430 data = self.get_zone(name)
2431 soa = [rrset['records'][0]['content'] for rrset in data['rrsets'] if rrset['type'] == 'SOA'][0]
2432 assert int(soa.split()[2]) > 1 # serial is larger than what we sent
2433
2434 def test_zone_replace_rrsets_no_soa_primary(self):
2435 """Replace all RRsets but supply no SOA. Should fail."""
2436 name, _, _ = self.create_zone()
2437 rrsets = [
2438 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]}
2439 ]
2440 self.put_zone(name, {'rrsets': rrsets}, expect_error='Must give SOA record for zone when replacing all RR sets')
2441
2442 @parameterized.expand([
2443 (None, []),
2444 (None, [
2445 {'name': '$NAME$', 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2446 ]),
2447 ])
2448 def test_zone_replace_rrsets_secondary(self, expected_error, rrsets):
2449 """
2450 Replace all RRsets in a SECONDARY zone.
2451
2452 If no SOA is given, this should still succeed, also setting zone stale (but cannot assert this here).
2453 """
2454 name, _, _ = self.create_zone(kind='Secondary', nameservers=None, masters=['127.0.0.2'])
2455 self.put_zone(name, {'rrsets': templated_rrsets(rrsets, name)}, expect_error=expected_error)
2456
2457 @parameterized.expand([
2458 (None, []),
2459 ("Modifying RRsets in Consumer zones is unsupported", [
2460 {'name': '$NAME$', 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2461 ]),
2462 ])
2463 def test_zone_replace_rrsets_consumer(self, expected_error, rrsets):
2464 name, _, _ = self.create_zone(kind='Consumer', nameservers=None, masters=['127.0.0.2'])
2465 self.put_zone(name, {'rrsets': templated_rrsets(rrsets, name)}, expect_error=expected_error)
2466
2467 def test_zone_replace_rrsets_negative_ttl(self):
2468 name, _, _ = self.create_zone(dnssec=False, soa_edit='', soa_edit_api='')
2469 rrsets = [
2470 {'name': name, 'type': 'SOA', 'ttl': -1, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2471 ]
2472 self.put_zone(name, {'rrsets': rrsets}, expect_error="Key 'ttl' is not a positive Integer")
2473
2474 @parameterized.expand([
2475 ("IN MX: non-hostname content", [{'name': '$NAME$', 'type': 'MX', 'ttl': 3600, 'records': [{"content": "10 mail@mx.example.org."}]}]),
2476 ("out of zone", [{'name': 'not-in-zone.', 'type': 'NS', 'ttl': 3600, 'records': [{"content": "ns1.example.org."}]}]),
2477 ("contains unsupported characters", [{'name': 'test:.$NAME$', 'type': 'NS', 'ttl': 3600, 'records': [{"content": "ns1.example.org."}]}]),
2478 ("unknown type", [{'name': '$NAME$', 'type': 'INVALID', 'ttl': 3600, 'records': [{"content": "192.0.2.1"}]}]),
2479 ("Conflicts with another RRset", [{'name': '$NAME$', 'type': 'CNAME', 'ttl': 3600, 'records': [{"content": "example.org."}]}]),
2480 ])
2481 def test_zone_replace_rrsets_invalid(self, expected_error, invalid_rrsets):
2482 """Test validation of RRsets before replacing them"""
2483 name, _, _ = self.create_zone(dnssec=False, soa_edit='', soa_edit_api='')
2484 base_rrsets = [
2485 {'name': name, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2486 {'name': name, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2487 ]
2488 rrsets = base_rrsets + templated_rrsets(invalid_rrsets, name)
2489 self.put_zone(name, {'rrsets': rrsets}, expect_error=expected_error)
2490
2491
2492 @unittest.skipIf(not is_auth(), "Not applicable")
2493 class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
2494
2495 def setUp(self):
2496 super(AuthRootZone, self).setUp()
2497 # zone name is not unique, so delete the zone before each individual test.
2498 self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
2499
2500 def test_create_zone(self):
2501 name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
2502 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
2503 self.assertIn(k, data)
2504 if k in payload:
2505 self.assertEqual(data[k], payload[k])
2506 # validate generated SOA
2507 rec = get_first_rec(data, '.', 'SOA')
2508 self.assertEqual(
2509 rec['content'],
2510 "a.misconfigured.dns.server.invalid. hostmaster. " + str(payload['serial']) +
2511 " 10800 3600 604800 3600"
2512 )
2513 # Regression test: verify zone list works
2514 zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
2515 print("zonelist:", zonelist)
2516 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
2517 # Also test that fetching the zone works.
2518 print("id:", data['id'])
2519 self.assertEqual(data['id'], '=2E')
2520 data = self.get_zone(data['id'])
2521 print("zone (fetched):", data)
2522 for k in ('name', 'kind'):
2523 self.assertIn(k, data)
2524 self.assertEqual(data[k], payload[k])
2525 self.assertEqual(data['rrsets'][0]['name'], '.')
2526
2527 def test_update_zone(self):
2528 name, payload, zone = self.create_zone(name='.')
2529 zone_id = '=2E'
2530 # update, set as Master and enable SOA-EDIT-API
2531 payload = {
2532 'kind': 'Master',
2533 'masters': ['192.0.2.1', '192.0.2.2'],
2534 'soa_edit_api': 'EPOCH',
2535 'soa_edit': 'EPOCH'
2536 }
2537 self.put_zone(zone_id, payload)
2538 data = self.get_zone(zone_id)
2539 for k in payload.keys():
2540 self.assertIn(k, data)
2541 self.assertEqual(data[k], payload[k])
2542 # update, back to Native and empty(off)
2543 payload = {
2544 'kind': 'Native',
2545 'soa_edit_api': '',
2546 'soa_edit': ''
2547 }
2548 self.put_zone(zone_id, payload)
2549 data = self.get_zone(zone_id)
2550 for k in payload.keys():
2551 self.assertIn(k, data)
2552 self.assertEqual(data[k], payload[k])
2553
2554
2555 @unittest.skipIf(not is_recursor(), "Not applicable")
2556 class RecursorZones(ApiTestCase):
2557
2558 def create_zone(self, name=None, kind=None, rd=False, servers=None):
2559 if name is None:
2560 name = unique_zone_name()
2561 if servers is None:
2562 servers = []
2563 payload = {
2564 'name': name,
2565 'kind': kind,
2566 'servers': servers,
2567 'recursion_desired': rd
2568 }
2569 r = self.session.post(
2570 self.url("/api/v1/servers/localhost/zones"),
2571 data=json.dumps(payload),
2572 headers={'content-type': 'application/json'})
2573 self.assert_success_json(r)
2574 return payload, r.json()
2575
2576 def test_create_auth_zone(self):
2577 payload, data = self.create_zone(kind='Native')
2578 for k in payload.keys():
2579 self.assertEqual(data[k], payload[k])
2580
2581 def test_create_zone_no_name(self):
2582 payload = {
2583 'name': '',
2584 'kind': 'Native',
2585 'servers': ['8.8.8.8'],
2586 'recursion_desired': False,
2587 }
2588 print(payload)
2589 r = self.session.post(
2590 self.url("/api/v1/servers/localhost/zones"),
2591 data=json.dumps(payload),
2592 headers={'content-type': 'application/json'})
2593 self.assertEqual(r.status_code, 422)
2594 self.assertIn('is not canonical', r.json()['error'])
2595
2596 def test_create_forwarded_zone(self):
2597 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
2598 # return values are normalized
2599 payload['servers'][0] += ':53'
2600 for k in payload.keys():
2601 self.assertEqual(data[k], payload[k])
2602
2603 def test_create_forwarded_rd_zone(self):
2604 payload, data = self.create_zone(name='google.com.', kind='Forwarded', rd=True, servers=['8.8.8.8'])
2605 # return values are normalized
2606 payload['servers'][0] += ':53'
2607 for k in payload.keys():
2608 self.assertEqual(data[k], payload[k])
2609
2610 def test_create_auth_zone_with_symbols(self):
2611 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
2612 expected_id = (payload['name'].replace('/', '=2F'))
2613 for k in payload.keys():
2614 self.assertEqual(data[k], payload[k])
2615 self.assertEqual(data['id'], expected_id)
2616
2617 def test_rename_auth_zone(self):
2618 payload, data = self.create_zone(kind='Native')
2619 name = payload['name']
2620 # now rename it
2621 payload = {
2622 'name': 'renamed-'+name,
2623 'kind': 'Native',
2624 'recursion_desired': False
2625 }
2626 r = self.session.put(
2627 self.url("/api/v1/servers/localhost/zones/" + name),
2628 data=json.dumps(payload),
2629 headers={'content-type': 'application/json'})
2630 self.assert_success(r)
2631 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + payload['name'])).json()
2632 for k in payload.keys():
2633 self.assertEqual(data[k], payload[k])
2634
2635 def test_zone_delete(self):
2636 payload, zone = self.create_zone(kind='Native')
2637 name = payload['name']
2638 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
2639 self.assertEqual(r.status_code, 204)
2640 self.assertNotIn('Content-Type', r.headers)
2641
2642 def test_search_rr_exact_zone(self):
2643 name = unique_zone_name()
2644 self.create_zone(name=name, kind='Native')
2645 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
2646 self.assert_success_json(r)
2647 print(r.json())
2648 self.assertEqual(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
2649
2650 def test_search_rr_substring(self):
2651 name = 'search-rr-zone.name.'
2652 self.create_zone(name=name, kind='Native')
2653 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2654 self.assert_success_json(r)
2655 print(r.json())
2656 # should return zone, SOA
2657 self.assertEqual(len(r.json()), 2)
2658
2659 @unittest.skipIf(not is_auth(), "Not applicable")
2660 class AuthZoneKeys(ApiTestCase, AuthZonesHelperMixin):
2661
2662 def test_get_keys(self):
2663 r = self.session.get(
2664 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2665 self.assert_success_json(r)
2666 keys = r.json()
2667 self.assertGreater(len(keys), 0)
2668
2669 key0 = deepcopy(keys[0])
2670 del key0['dnskey']
2671 del key0['ds']
2672 expected = {
2673 u'algorithm': u'ECDSAP256SHA256',
2674 u'bits': 256,
2675 u'active': True,
2676 u'type': u'Cryptokey',
2677 u'keytype': u'csk',
2678 u'flags': 257,
2679 u'published': True,
2680 u'id': 1}
2681 self.assertEqual(key0, expected)
2682
2683 keydata = keys[0]['dnskey'].split()
2684 self.assertEqual(len(keydata), 4)
2685
2686 def test_get_keys_with_cds(self):
2687 payload_metadata = {"type": "Metadata", "kind": "PUBLISH-CDS", "metadata": ["4"]}
2688 r = self.session.post(self.url("/api/v1/servers/localhost/zones/powerdnssec.org./metadata"),
2689 data=json.dumps(payload_metadata))
2690 rdata = r.json()
2691 self.assertEqual(r.status_code, 201)
2692 self.assertEqual(rdata["metadata"], payload_metadata["metadata"])
2693
2694 r = self.session.get(
2695 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2696 self.assert_success_json(r)
2697 keys = r.json()
2698 self.assertGreater(len(keys), 0)
2699
2700 key0 = deepcopy(keys[0])
2701 self.assertEqual(len(key0['cds']), 1)
2702 self.assertIn(key0['cds'][0], key0['ds'])
2703 self.assertEqual(key0['cds'][0].split()[2], '4')
2704 del key0['dnskey']
2705 del key0['ds']
2706 del key0['cds']
2707 expected = {
2708 u'algorithm': u'ECDSAP256SHA256',
2709 u'bits': 256,
2710 u'active': True,
2711 u'type': u'Cryptokey',
2712 u'keytype': u'csk',
2713 u'flags': 257,
2714 u'published': True,
2715 u'id': 1}
2716 self.assertEqual(key0, expected)
2717
2718 keydata = keys[0]['dnskey'].split()
2719 self.assertEqual(len(keydata), 4)
2720
2721 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/powerdnssec.org./metadata/PUBLISH-CDS"))
2722 self.assertEqual(r.status_code, 204)