]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
8a4871a988b1252fcd95a548008b3372667ec931
1 from __future__
import print_function
6 import requests
.exceptions
7 from copy
import deepcopy
8 from parameterized
import parameterized
9 from pprint
import pprint
10 from test_helper
import ApiTestCase
, unique_zone_name
, is_auth
, is_auth_lmdb
, is_recursor
, get_db_records
, pdnsutil_rectify
, sdig
13 def get_rrset(data
, qname
, qtype
):
14 for rrset
in data
['rrsets']:
15 if rrset
['name'] == qname
and rrset
['type'] == qtype
:
20 def get_first_rec(data
, qname
, qtype
):
21 rrset
= get_rrset(data
, qname
, qtype
)
23 return rrset
['records'][0]
27 def eq_zone_rrsets(rrsets
, expected
):
30 for type_
, expected_records
in expected
.items():
32 data_got
[type_
] = set()
33 data_expected
[type_
] = set()
34 uses_name
= any(['name' in expected_record
for expected_record
in expected_records
])
35 # minify + convert received data
36 for rrset
in [rrset
for rrset
in rrsets
if rrset
['type'] == type_
]:
38 for r
in rrset
['records']:
39 data_got
[type_
].add((rrset
['name'] if uses_name
else '@', rrset
['type'], r
['content']))
40 # minify expected data
41 for r
in expected_records
:
42 data_expected
[type_
].add((r
['name'] if uses_name
else '@', type_
, r
['content']))
44 print("eq_zone_rrsets: got:")
46 print("eq_zone_rrsets: expected:")
49 assert data_got
== data_expected
, "%r != %r" % (data_got
, data_expected
)
52 def assert_eq_rrsets(rrsets
, expected
):
53 """Assert rrsets sets are equal, ignoring sort order."""
54 key
= lambda rrset
: (rrset
['name'], rrset
['type'])
55 assert sorted(rrsets
, key
=key
) == sorted(expected
, key
=key
)
58 def templated_rrsets(rrsets
: list, zonename
: str):
60 Replace $NAME$ in `name` and `content` of given rrsets with `zonename`.
61 Will return a copy. Original rrsets should stay unmodified.
65 new_rrset
= rrset |
{"name": rrset
["name"].replace('$NAME$', zonename
)}
67 if "records" in rrset
:
69 for record
in rrset
["records"]:
70 records
.append(record |
{"content": record
["content"].replace('$NAME$', zonename
)})
71 new_rrset
["records"] = records
73 new_rrsets
.append(new_rrset
)
78 class Zones(ApiTestCase
):
80 def _test_list_zones(self
, dnssec
=True):
81 path
= "/api/v1/servers/localhost/zones"
83 path
= path
+ "?dnssec=false"
84 r
= self
.session
.get(self
.url(path
))
85 self
.assert_success_json(r
)
87 example_com
= [domain
for domain
in domains
if domain
['name'] in ('example.com', 'example.com.')]
88 self
.assertEqual(len(example_com
), 1)
89 example_com
= example_com
[0]
91 required_fields
= ['id', 'url', 'name', 'kind']
93 required_fields
= required_fields
+ ['masters', 'last_check', 'notified_serial', 'serial', 'account', 'catalog']
95 required_fields
= required_fields
= ['dnssec', 'edited_serial']
96 self
.assertNotEqual(example_com
['serial'], 0)
98 self
.assertNotIn('dnssec', example_com
)
100 required_fields
= required_fields
+ ['recursion_desired', 'servers']
101 for field
in required_fields
:
102 self
.assertIn(field
, example_com
)
104 def test_list_zones_with_dnssec(self
):
106 self
._test
_list
_zones
(True)
108 def test_list_zones_without_dnssec(self
):
109 self
._test
_list
_zones
(False)
112 class AuthZonesHelperMixin(object):
113 def create_zone(self
, name
=None, expect_error
=None, **kwargs
):
115 name
= unique_zone_name()
119 "nameservers": ["ns1.example.com.", "ns2.example.com."]
121 for k
, v
in kwargs
.items():
126 if "zone" in payload
:
127 payload
["zone"] = payload
["zone"].replace("$NAME$", payload
["name"])
128 if "rrsets" in payload
:
129 payload
["rrsets"] = templated_rrsets(payload
["rrsets"], payload
["name"])
131 print("Create zone", name
, "with:", payload
)
132 r
= self
.session
.post(
133 self
.url("/api/v1/servers/localhost/zones"),
134 data
=json
.dumps(payload
),
135 headers
={"content-type": "application/json"})
138 self
.assertEqual(r
.status_code
, 422, r
.content
)
140 if expect_error
is True:
143 self
.assertIn(expect_error
, reply
["error"])
146 self
.assertEqual(r
.status_code
, 201, r
.content
)
149 return name
, payload
, reply
151 def get_zone(self
, api_zone_id
, expect_error
=None, **kwargs
):
152 print("GET zone", api_zone_id
, "args:", kwargs
)
153 r
= self
.session
.get(
154 self
.url("/api/v1/servers/localhost/zones/" + api_zone_id
),
159 print("reply", reply
)
162 self
.assertEqual(r
.status_code
, 422)
163 if expect_error
is True:
166 self
.assertIn(expect_error
, r
.json()['error'])
169 self
.assert_success_json(r
)
170 self
.assertEqual(r
.status_code
, 200)
174 def put_zone(self
, api_zone_id
, payload
, expect_error
=None):
175 print("PUT zone", api_zone_id
, "with:", payload
)
176 r
= self
.session
.put(
177 self
.url("/api/v1/servers/localhost/zones/" + api_zone_id
),
178 data
=json
.dumps(payload
),
179 headers
={'content-type': 'application/json'})
181 print("reply status code:", r
.status_code
)
183 self
.assertEqual(r
.status_code
, 422, r
.content
)
185 if expect_error
is True:
188 self
.assertIn(expect_error
, reply
['error'])
190 # expect success (no content)
191 self
.assertEqual(r
.status_code
, 204, r
.content
)
193 @unittest.skipIf(not is_auth(), "Not applicable")
194 class AuthZones(ApiTestCase
, AuthZonesHelperMixin
):
196 def test_create_zone(self
):
197 # soa_edit_api has a default, override with empty for this test
198 name
, payload
, data
= self
.create_zone(serial
=22, soa_edit_api
='')
199 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'edited_serial', 'soa_edit_api', 'soa_edit', 'account'):
200 self
.assertIn(k
, data
)
202 self
.assertEqual(data
[k
], payload
[k
])
203 # validate generated SOA
204 expected_soa
= "a.misconfigured.dns.server.invalid. hostmaster." + name
+ " " + \
205 str(payload
['serial']) + " 10800 3600 604800 3600"
207 get_first_rec(data
, name
, 'SOA')['content'],
211 if not is_auth_lmdb():
212 # Because we had confusion about dots, check that the DB is without dots.
213 dbrecs
= get_db_records(name
, 'SOA')
214 self
.assertEqual(dbrecs
[0]['content'], expected_soa
.replace('. ', ' '))
215 self
.assertNotEqual(data
['serial'], data
['edited_serial'])
217 def test_create_zone_with_soa_edit_api(self
):
218 # soa_edit_api wins over serial
219 name
, payload
, data
= self
.create_zone(soa_edit_api
='EPOCH', serial
=10)
220 for k
in ('soa_edit_api', ):
221 self
.assertIn(k
, data
)
223 self
.assertEqual(data
[k
], payload
[k
])
224 # generated EPOCH serial surely is > fixed serial we passed in
226 self
.assertGreater(data
['serial'], payload
['serial'])
227 soa_serial
= int(get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2])
228 self
.assertGreater(soa_serial
, payload
['serial'])
229 self
.assertEqual(soa_serial
, data
['serial'])
231 def test_create_zone_with_catalog(self
):
232 # soa_edit_api wins over serial
233 name
, payload
, data
= self
.create_zone(catalog
='catalog.invalid.', serial
=10)
235 for k
in ('catalog', ):
236 self
.assertIn(k
, data
)
238 self
.assertEqual(data
[k
], payload
[k
])
240 # check that the catalog is reflected in the /zones output (#13633)
241 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
242 self
.assert_success_json(r
)
244 domain
= [domain
for domain
in domains
if domain
['name'] == name
]
245 self
.assertEqual(len(domain
), 1)
247 self
.assertEqual(domain
["catalog"], "catalog.invalid.")
249 def test_create_zone_with_account(self
):
250 # soa_edit_api wins over serial
251 name
, payload
, data
= self
.create_zone(account
='anaccount', serial
=10, kind
='Master')
253 for k
in ('account', ):
254 self
.assertIn(k
, data
)
256 self
.assertEqual(data
[k
], payload
[k
])
258 # as we did not set a catalog in our request, check that the default catalog was applied
259 self
.assertEqual(data
['catalog'], "default-catalog.example.com.")
261 def test_create_zone_default_soa_edit_api(self
):
262 name
, payload
, data
= self
.create_zone()
264 self
.assertEqual(data
['soa_edit_api'], 'DEFAULT')
266 def test_create_zone_exists(self
):
267 name
, payload
, data
= self
.create_zone()
274 r
= self
.session
.post(
275 self
.url("/api/v1/servers/localhost/zones"),
276 data
=json
.dumps(payload
),
277 headers
={'content-type': 'application/json'})
278 self
.assertEqual(r
.status_code
, 409) # Conflict - already exists
280 def test_create_zone_with_soa_edit(self
):
281 name
, payload
, data
= self
.create_zone(soa_edit
='INCEPTION-INCREMENT', soa_edit_api
='SOA-EDIT-INCREASE')
283 self
.assertEqual(data
['soa_edit'], 'INCEPTION-INCREMENT')
284 self
.assertEqual(data
['soa_edit_api'], 'SOA-EDIT-INCREASE')
285 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
286 # These particular settings lead to the first serial set to YYYYMMDD01.
287 self
.assertEqual(soa_serial
[-2:], '01')
289 'changetype': 'replace',
295 "content": "127.0.0.1",
300 payload
= {'rrsets': [rrset
]}
301 r
= self
.session
.patch(
302 self
.url("/api/v1/servers/localhost/zones/" + data
['id']),
303 data
=json
.dumps(payload
),
304 headers
={'content-type': 'application/json'})
305 data
= self
.get_zone(data
['id'])
306 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
307 self
.assertEqual(soa_serial
[-2:], '02')
308 self
.assertEqual(r
.headers
['X-PDNS-Old-Serial'][-2:], '01')
309 self
.assertEqual(r
.headers
['X-PDNS-New-Serial'][-2:], '02')
311 def test_create_zone_with_records(self
):
312 name
= unique_zone_name()
318 "content": "4.3.2.1",
322 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
323 # check our record has appeared
324 self
.assertEqual(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
326 def test_create_zone_with_wildcard_records(self
):
327 name
= unique_zone_name()
333 "content": "4.3.2.1",
337 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
338 # check our record has appeared
339 self
.assertEqual(get_rrset(data
, rrset
['name'], 'A')['records'], rrset
['records'])
341 def test_create_zone_with_comments(self
):
342 name
= unique_zone_name()
346 "type": "soa", # test uppercasing of type, too.
349 "content": "blah blah and test a few non-ASCII chars: ö, €",
350 "modified_at": 11112,
358 "content": "2001:DB8::1",
362 "account": "test AAAA",
363 "content": "blah blah AAAA",
364 "modified_at": 11112,
372 "content": "\"test TXT\"",
381 "content": "192.0.2.1",
388 # No comments in LMDB
389 self
.create_zone(name
=name
, rrsets
=rrsets
, expect_error
="Hosting backend does not support editing comments.")
392 name
, _
, data
= self
.create_zone(name
=name
, rrsets
=rrsets
)
393 # NS records have been created
394 self
.assertEqual(len(data
['rrsets']), len(rrsets
) + 1)
395 # check our comment has appeared
396 self
.assertEqual(get_rrset(data
, name
, 'SOA')['comments'], rrsets
[0]['comments'])
397 self
.assertEqual(get_rrset(data
, name
, 'A')['comments'], [])
398 self
.assertEqual(get_rrset(data
, name
, 'TXT')['comments'], [])
399 self
.assertEqual(get_rrset(data
, name
, 'AAAA')['comments'], rrsets
[1]['comments'])
401 def test_create_zone_uncanonical_nameservers(self
):
402 name
= unique_zone_name()
406 'nameservers': ['uncanon.example.com']
409 r
= self
.session
.post(
410 self
.url("/api/v1/servers/localhost/zones"),
411 data
=json
.dumps(payload
),
412 headers
={'content-type': 'application/json'})
413 self
.assertEqual(r
.status_code
, 422)
414 self
.assertIn('Nameserver is not canonical', r
.json()['error'])
416 def test_create_auth_zone_no_name(self
):
417 name
= unique_zone_name()
423 r
= self
.session
.post(
424 self
.url("/api/v1/servers/localhost/zones"),
425 data
=json
.dumps(payload
),
426 headers
={'content-type': 'application/json'})
427 self
.assertEqual(r
.status_code
, 422)
428 self
.assertIn('is not canonical', r
.json()['error'])
430 def test_create_zone_with_custom_soa(self
):
431 name
= unique_zone_name()
432 content
= u
"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
435 "type": "soa", # test uppercasing of type, too.
442 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
], soa_edit_api
='')
443 self
.assertEqual(get_rrset(data
, name
, 'SOA')['records'], rrset
['records'])
444 if not is_auth_lmdb():
445 dbrecs
= get_db_records(name
, 'SOA')
446 self
.assertEqual(dbrecs
[0]['content'], content
.replace('. ', ' '))
448 def test_create_zone_double_dot(self
):
449 name
= 'test..' + unique_zone_name()
453 'nameservers': ['ns1.example.com.']
456 r
= self
.session
.post(
457 self
.url("/api/v1/servers/localhost/zones"),
458 data
=json
.dumps(payload
),
459 headers
={'content-type': 'application/json'})
460 self
.assertEqual(r
.status_code
, 422)
461 self
.assertIn('Unable to parse Zone Name', r
.json()['error'])
463 def test_create_zone_restricted_chars(self
):
464 name
= 'test:' + unique_zone_name() # : isn't good as a name.
468 'nameservers': ['ns1.example.com']
471 r
= self
.session
.post(
472 self
.url("/api/v1/servers/localhost/zones"),
473 data
=json
.dumps(payload
),
474 headers
={'content-type': 'application/json'})
475 self
.assertEqual(r
.status_code
, 422)
476 self
.assertIn('contains unsupported characters', r
.json()['error'])
478 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self
):
479 name
= unique_zone_name()
485 "content": "ns2.example.com.",
492 'nameservers': ['ns1.example.com.'],
496 r
= self
.session
.post(
497 self
.url("/api/v1/servers/localhost/zones"),
498 data
=json
.dumps(payload
),
499 headers
={'content-type': 'application/json'})
500 self
.assertEqual(r
.status_code
, 422)
501 self
.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r
.json()['error'])
503 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self
):
504 name
= unique_zone_name()
506 "name": 'subzone.'+name
,
510 "content": "ns2.example.com.",
517 'nameservers': ['ns1.example.com.'],
521 r
= self
.session
.post(
522 self
.url("/api/v1/servers/localhost/zones"),
523 data
=json
.dumps(payload
),
524 headers
={'content-type': 'application/json'})
525 self
.assert_success_json(r
)
527 def test_create_zone_with_symbols(self
):
528 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
529 name
= payload
['name']
530 expected_id
= name
.replace('/', '=2F')
531 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
532 self
.assertIn(k
, data
)
534 self
.assertEqual(data
[k
], payload
[k
])
535 self
.assertEqual(data
['id'], expected_id
)
536 if not is_auth_lmdb():
537 dbrecs
= get_db_records(name
, 'SOA')
538 self
.assertEqual(dbrecs
[0]['name'], name
.rstrip('.'))
540 def test_create_zone_with_nameservers_non_string(self
):
541 # ensure we don't crash
542 name
= unique_zone_name()
546 'nameservers': [{'a': 'ns1.example.com'}] # invalid
549 r
= self
.session
.post(
550 self
.url("/api/v1/servers/localhost/zones"),
551 data
=json
.dumps(payload
),
552 headers
={'content-type': 'application/json'})
553 self
.assertEqual(r
.status_code
, 422)
555 def test_create_zone_with_dnssec(self
):
557 Create a zone with "dnssec" set and see if a key was made.
559 name
= unique_zone_name()
560 name
, payload
, data
= self
.create_zone(dnssec
=True)
564 for k
in ('dnssec', ):
565 self
.assertIn(k
, data
)
567 self
.assertEqual(data
[k
], payload
[k
])
569 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
575 self
.assertEqual(r
.status_code
, 200)
576 self
.assertEqual(len(keys
), 1)
577 self
.assertEqual(keys
[0]['type'], 'Cryptokey')
578 self
.assertEqual(keys
[0]['active'], True)
579 self
.assertEqual(keys
[0]['keytype'], 'csk')
581 def test_create_zone_with_dnssec_disable_dnssec(self
):
583 Create a zone with "dnssec", then set "dnssec" to false and see if the
586 name
= unique_zone_name()
587 name
, payload
, data
= self
.create_zone(dnssec
=True)
589 self
.put_zone(name
, {'dnssec': False})
591 zoneinfo
= self
.get_zone(name
)
592 self
.assertEqual(zoneinfo
['dnssec'], False)
594 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
598 self
.assertEqual(r
.status_code
, 200)
599 self
.assertEqual(len(keys
), 0)
601 def test_create_zone_with_nsec3param(self
):
603 Create a zone with "nsec3param" set and see if the metadata was added.
605 name
= unique_zone_name()
606 nsec3param
= '1 0 100 aabbccddeeff'
607 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
)
611 for k
in ('dnssec', 'nsec3param'):
612 self
.assertIn(k
, data
)
614 self
.assertEqual(data
[k
], payload
[k
])
616 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3PARAM'))
622 self
.assertEqual(r
.status_code
, 200)
623 self
.assertEqual(len(data
['metadata']), 1)
624 self
.assertEqual(data
['kind'], 'NSEC3PARAM')
625 self
.assertEqual(data
['metadata'][0], nsec3param
)
627 def test_create_zone_with_nsec3narrow(self
):
629 Create a zone with "nsec3narrow" set and see if the metadata was added.
631 name
= unique_zone_name()
632 nsec3param
= '1 0 100 aabbccddeeff'
633 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
,
638 for k
in ('dnssec', 'nsec3param', 'nsec3narrow'):
639 self
.assertIn(k
, data
)
641 self
.assertEqual(data
[k
], payload
[k
])
643 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3NARROW'))
649 self
.assertEqual(r
.status_code
, 200)
650 self
.assertEqual(len(data
['metadata']), 1)
651 self
.assertEqual(data
['kind'], 'NSEC3NARROW')
652 self
.assertEqual(data
['metadata'][0], '1')
654 def test_create_zone_with_nsec3param_switch_to_nsec(self
):
656 Create a zone with "nsec3param", then remove the params
658 name
, payload
, data
= self
.create_zone(dnssec
=True,
659 nsec3param
='1 0 1 ab')
660 self
.put_zone(name
, {'nsec3param': ''})
662 data
= self
.get_zone(name
)
663 self
.assertEqual(data
['nsec3param'], '')
665 def test_create_zone_without_dnssec_unset_nsec3parm(self
):
667 Create a non dnssec zone and set an empty "nsec3param"
669 name
, payload
, data
= self
.create_zone(dnssec
=False)
670 self
.put_zone(name
, {'nsec3param': ''})
672 def test_create_zone_without_dnssec_set_nsec3parm(self
):
674 Create a non dnssec zone and set "nsec3param"
676 name
, payload
, data
= self
.create_zone(dnssec
=False)
677 self
.put_zone(name
, {'nsec3param': '1 0 1 ab'}, expect_error
=True)
679 def test_create_zone_dnssec_serial(self
):
681 Create a zone, then set and unset "dnssec", then check if the serial was increased
684 name
, payload
, data
= self
.create_zone()
686 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
687 self
.assertEqual(soa_serial
[-2:], '01')
689 self
.put_zone(name
, {'dnssec': True})
691 data
= self
.get_zone(name
)
692 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
693 self
.assertEqual(soa_serial
[-2:], '02')
695 self
.put_zone(name
, {'dnssec': False})
697 data
= self
.get_zone(name
)
698 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
699 self
.assertEqual(soa_serial
[-2:], '03')
701 def test_zone_absolute_url(self
):
703 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
706 self
.assertTrue(rdata
[0]['url'].startswith('/api/v'))
708 def test_create_zone_metadata(self
):
709 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
710 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
711 data
=json
.dumps(payload_metadata
))
713 self
.assertEqual(r
.status_code
, 201)
714 self
.assertEqual(rdata
["metadata"], payload_metadata
["metadata"])
716 def test_create_zone_metadata_kind(self
):
717 payload_metadata
= {"metadata": ["127.0.0.2"]}
718 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
719 data
=json
.dumps(payload_metadata
))
721 self
.assertEqual(r
.status_code
, 200)
722 self
.assertEqual(rdata
["metadata"], payload_metadata
["metadata"])
724 def test_create_protected_zone_metadata(self
):
725 # test whether it prevents modification of certain kinds
726 for k
in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
727 payload
= {"metadata": ["FOO", "BAR"]}
728 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k
),
729 data
=json
.dumps(payload
))
730 self
.assertEqual(r
.status_code
, 422)
732 def test_retrieve_zone_metadata(self
):
733 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
734 self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
735 data
=json
.dumps(payload_metadata
))
736 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"))
738 self
.assertEqual(r
.status_code
, 200)
739 self
.assertIn(payload_metadata
, rdata
)
741 def test_delete_zone_metadata(self
):
742 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
743 self
.assertEqual(r
.status_code
, 204)
744 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
746 self
.assertEqual(r
.status_code
, 200)
747 self
.assertEqual(rdata
["metadata"], [])
749 def test_create_external_zone_metadata(self
):
750 payload_metadata
= {"metadata": ["My very important message"]}
751 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
752 data
=json
.dumps(payload_metadata
))
753 self
.assertEqual(r
.status_code
, 200)
755 self
.assertEqual(rdata
["metadata"], payload_metadata
["metadata"])
757 def test_create_metadata_in_non_existent_zone(self
):
758 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
759 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
760 data
=json
.dumps(payload_metadata
))
761 self
.assertEqual(r
.status_code
, 404)
762 # Note: errors should probably contain json (see #5988)
763 # self.assertIn('Could not find domain ', r.json()['error'])
765 def test_create_slave_zone(self
):
766 # Test that nameservers can be absent for slave zones.
767 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
768 for k
in ('name', 'masters', 'kind'):
769 self
.assertIn(k
, data
)
770 self
.assertEqual(data
[k
], payload
[k
])
771 print("payload:", payload
)
773 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
774 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
776 print("zonelist:", zonelist
)
777 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
778 # Also test that fetching the zone works.
779 data
= self
.get_zone(data
['id'])
780 print("zone (fetched):", data
)
781 for k
in ('name', 'masters', 'kind'):
782 self
.assertIn(k
, data
)
783 self
.assertEqual(data
[k
], payload
[k
])
784 self
.assertEqual(data
['serial'], 0)
785 self
.assertEqual(data
['rrsets'], [])
787 def test_create_consumer_zone(self
):
788 # Test that nameservers can be absent for consumer zones.
789 _
, payload
, data
= self
.create_zone(kind
='Consumer', nameservers
=None, masters
=['127.0.0.2'])
790 print("payload:", payload
)
792 # Because consumer zones don't get a SOA, we need to test that they'll show up in the zone list.
793 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
795 print("zonelist:", zonelist
)
796 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
797 # Also test that fetching the zone works.
798 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
800 print("zone (fetched):", data
)
801 for k
in ('name', 'masters', 'kind'):
802 self
.assertIn(k
, data
)
803 self
.assertEqual(data
[k
], payload
[k
])
804 self
.assertEqual(data
['serial'], 0)
805 self
.assertEqual(data
['rrsets'], [])
807 def test_create_consumer_zone_no_nameservers(self
):
808 """nameservers must be absent for Consumer zones"""
809 self
.create_zone(kind
="Consumer", nameservers
=["127.0.0.1"], expect_error
="Nameservers MUST NOT be given for Consumer zones")
811 def test_create_consumer_zone_no_rrsets(self
):
812 """rrsets must be absent for Consumer zones"""
818 "content": "ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600",
822 self
.create_zone(kind
="Consumer", nameservers
=None, rrsets
=rrsets
, expect_error
="Zone data MUST NOT be given for Consumer zones")
824 def test_find_zone_by_name(self
):
825 name
= 'foo/' + unique_zone_name()
826 name
, payload
, data
= self
.create_zone(name
=name
)
827 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones?zone=" + name
))
830 self
.assertEqual(data
[0]['name'], name
)
832 def test_delete_slave_zone(self
):
833 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
834 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
837 def test_delete_consumer_zone(self
):
838 name
, payload
, data
= self
.create_zone(kind
='Consumer', nameservers
=None, masters
=['127.0.0.2'])
839 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
842 def test_retrieve_slave_zone(self
):
843 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
844 print("payload:", payload
)
846 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/axfr-retrieve"))
848 print("status for axfr-retrieve:", data
)
849 self
.assertEqual(data
['result'], u
'Added retrieval request for \'' + payload
['name'] +
850 '\' from primary 127.0.0.2')
852 def test_notify_master_zone(self
):
853 name
, payload
, data
= self
.create_zone(kind
='Master')
854 print("payload:", payload
)
856 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/notify"))
858 print("status for notify:", data
)
859 self
.assertEqual(data
['result'], 'Notification queued')
861 def test_get_zone_with_symbols(self
):
862 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
863 name
= payload
['name']
864 zone_id
= (name
.replace('/', '=2F'))
865 data
= self
.get_zone(zone_id
)
866 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
867 self
.assertIn(k
, data
)
869 self
.assertEqual(data
[k
], payload
[k
])
871 def test_get_zone(self
):
872 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
874 example_com
= [domain
for domain
in domains
if domain
['name'] == u
'example.com.'][0]
875 data
= self
.get_zone(example_com
['id'])
876 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
877 self
.assertIn(k
, data
)
878 self
.assertEqual(data
['name'], 'example.com.')
880 def test_get_zone_rrset(self
):
881 name
= 'host-18000.example.com.'
882 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
884 example_com
= [domain
for domain
in domains
if domain
['name'] == u
'example.com.'][0]
886 # verify single record from name that has a single record
887 data
= self
.get_zone(example_com
['id'], rrset_name
="host-18000.example.com.")
888 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
889 self
.assertIn(k
, data
)
890 self
.assertEqual(data
['rrsets'],
898 'content': '192.168.1.80',
908 # disable previous record
910 'changetype': 'replace',
917 'content': '192.168.1.80',
922 payload
= {'rrsets': [rrset
]}
923 r
= self
.session
.patch(
924 self
.url("/api/v1/servers/localhost/zones/example.com"),
925 data
=json
.dumps(payload
),
926 headers
={'content-type': 'application/json'})
927 self
.assert_success(r
)
929 # verify that the changed record is not found when asking for
930 # disabled records not to be included
931 data
= self
.get_zone(example_com
['id'], rrset_name
="host-18000.example.com.", include_disabled
="false")
932 self
.assertEqual(len(data
['rrsets']), 0)
934 # verify that the changed record is found when explicitly asking for
935 # disabled records, and by default.
936 data
= self
.get_zone(example_com
['id'], rrset_name
="host-18000.example.com.", include_disabled
="true")
937 self
.assertEqual(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
938 data
= self
.get_zone(example_com
['id'], rrset_name
="host-18000.example.com.")
939 self
.assertEqual(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
941 # verify two RRsets from a name that has two types with one record each
942 powerdnssec_org
= [domain
for domain
in domains
if domain
['name'] == u
'powerdnssec.org.'][0]
943 data
= self
.get_zone(powerdnssec_org
['id'], rrset_name
="localhost.powerdnssec.org.")
944 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
945 self
.assertIn(k
, data
)
946 self
.assertEqual(sorted(data
['rrsets'], key
=operator
.itemgetter('type')),
950 'name': 'localhost.powerdnssec.org.',
954 'content': '127.0.0.1',
963 'name': 'localhost.powerdnssec.org.',
977 # verify one RRset with one record from a name that has two, then filtered by type
978 data
= self
.get_zone(powerdnssec_org
['id'], rrset_name
="localhost.powerdnssec.org.", rrset_type
="AAAA")
979 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
980 self
.assertIn(k
, data
)
981 self
.assertEqual(data
['rrsets'],
985 'name': 'localhost.powerdnssec.org.',
999 def test_import_zone_broken(self
):
1001 'name': 'powerdns-broken.com',
1005 payload
['zone'] = """
1006 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
1007 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
1008 ;; WARNING: recursion requested but not available
1010 ;; OPT PSEUDOSECTION:
1011 ; EDNS: version: 0, flags:; udp: 1680
1012 ;; QUESTION SECTION:
1013 ;powerdns.com. IN SOA
1016 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1017 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
1018 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
1019 powerdns-broken.com. 86400 IN A 82.94.213.34
1020 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
1021 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
1022 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1024 r
= self
.session
.post(
1025 self
.url("/api/v1/servers/localhost/zones"),
1026 data
=json
.dumps(payload
),
1027 headers
={'content-type': 'application/json'})
1028 self
.assertEqual(r
.status_code
, 422)
1030 def test_import_zone_axfr_outofzone(self
):
1031 # Ensure we don't create out-of-zone records
1033 'name': unique_zone_name(),
1037 payload
['zone'] = """
1038 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1039 %NAME% 3600 IN NS powerdnssec2.ds9a.nl.
1040 example.org. 3600 IN AAAA 2001:888:2000:1d::2
1041 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1042 """.replace('%NAME%', payload
['name'])
1043 r
= self
.session
.post(
1044 self
.url("/api/v1/servers/localhost/zones"),
1045 data
=json
.dumps(payload
),
1046 headers
={'content-type': 'application/json'})
1047 self
.assertEqual(r
.status_code
, 422)
1048 self
.assertEqual(r
.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
1050 def test_import_zone_axfr(self
):
1052 'name': 'powerdns.com.',
1055 'soa_edit_api': '', # turn off so exact SOA comparison works.
1057 payload
['zone'] = """
1058 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
1059 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
1060 ;; WARNING: recursion requested but not available
1062 ;; OPT PSEUDOSECTION:
1063 ; EDNS: version: 0, flags:; udp: 1680
1064 ;; QUESTION SECTION:
1065 ;powerdns.com. IN SOA
1068 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1069 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
1070 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
1071 powerdns.com. 86400 IN A 82.94.213.34
1072 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
1073 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
1074 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1076 r
= self
.session
.post(
1077 self
.url("/api/v1/servers/localhost/zones"),
1078 data
=json
.dumps(payload
),
1079 headers
={'content-type': 'application/json'})
1080 self
.assert_success_json(r
)
1082 self
.assertIn('name', data
)
1086 {'content': 'powerdnssec1.ds9a.nl.'},
1087 {'content': 'powerdnssec2.ds9a.nl.'},
1090 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
1093 {'content': '0 xs.powerdns.com.'},
1096 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
1099 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
1103 eq_zone_rrsets(data
['rrsets'], expected
)
1105 if not is_auth_lmdb():
1106 # check content in DB is stored WITHOUT trailing dot.
1107 dbrecs
= get_db_records(payload
['name'], 'NS')
1108 dbrec
= next((dbrec
for dbrec
in dbrecs
if dbrec
['content'].startswith('powerdnssec1')))
1109 self
.assertEqual(dbrec
['content'], 'powerdnssec1.ds9a.nl')
1111 def test_import_zone_bind(self
):
1113 'name': 'example.org.',
1116 'soa_edit_api': '', # turn off so exact SOA comparison works.
1118 payload
['zone'] = """
1119 $TTL 86400 ; 24 hours could have been written as 24h or 1d
1120 ; $TTL used for all RRs without explicit TTL value
1121 $ORIGIN example.org.
1122 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
1129 IN NS ns1.example.org. ; in the domain
1130 IN NS ns2.smokeyjoe.com. ; external to domain
1131 IN MX 10 mail.another.com. ; external mail provider
1132 ; server host definitions
1133 ns1 IN A 192.168.0.1 ;name server definition
1134 www IN A 192.168.0.2 ;web server definition
1135 ftp IN CNAME www.example.org. ;ftp server definition
1136 ; non server domain hosts
1137 bill IN A 192.168.0.3
1138 fred IN A 192.168.0.4
1140 r
= self
.session
.post(
1141 self
.url("/api/v1/servers/localhost/zones"),
1142 data
=json
.dumps(payload
),
1143 headers
={'content-type': 'application/json'})
1144 self
.assert_success_json(r
)
1146 self
.assertIn('name', data
)
1150 {'content': 'ns1.example.org.'},
1151 {'content': 'ns2.smokeyjoe.com.'},
1154 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
1157 {'content': '10 mail.another.com.'},
1160 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
1161 {'content': '192.168.0.2', 'name': 'www.example.org.'},
1162 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
1163 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
1166 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
1170 eq_zone_rrsets(data
['rrsets'], expected
)
1172 def test_import_zone_bind_cname_apex(self
):
1174 'name': unique_zone_name(),
1178 payload
['zone'] = """
1180 @ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
1181 @ IN NS ns1.example.org.
1182 @ IN NS ns2.smokeyjoe.com.
1183 @ IN CNAME www.example.org.
1184 """.replace('%NAME%', payload
['name'])
1185 r
= self
.session
.post(
1186 self
.url("/api/v1/servers/localhost/zones"),
1187 data
=json
.dumps(payload
),
1188 headers
={'content-type': 'application/json'})
1189 self
.assertEqual(r
.status_code
, 422)
1190 self
.assertIn('Conflicts with another RRset', r
.json()['error'])
1192 def test_export_zone_json(self
):
1193 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
1195 r
= self
.session
.get(
1196 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
1197 headers
={'accept': 'application/json;q=0.9,*/*;q=0.8'}
1199 self
.assert_success_json(r
)
1201 self
.assertIn('zone', data
)
1202 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
1203 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
1204 name
+ '\t3600\tIN\tSOA\ta.misconfigured.dns.server.invalid. hostmaster.' + name
+
1205 ' 0 10800 3600 604800 3600']
1206 self
.assertCountEqual(data
['zone'].strip().split('\n'), expected_data
)
1208 def test_import_zone_consumer(self
):
1210 $NAME$ 1D IN SOA ns1.example.org. hostmaster.example.org. (
1218 self
.create_zone(kind
="Consumer", nameservers
=[], zone
=zonestring
, expect_error
="Zone data MUST NOT be given for Consumer zones")
1220 def test_export_zone_text(self
):
1221 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
1223 r
= self
.session
.get(
1224 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
1225 headers
={'accept': '*/*'}
1227 data
= r
.text
.strip().split("\n")
1228 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
1229 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
1230 name
+ '\t3600\tIN\tSOA\ta.misconfigured.dns.server.invalid. hostmaster.' + name
+
1231 ' 0 10800 3600 604800 3600']
1232 self
.assertCountEqual(data
, expected_data
)
1234 def test_update_zone(self
):
1235 name
, payload
, zone
= self
.create_zone()
1236 name
= payload
['name']
1237 # update, set as Master and enable SOA-EDIT-API
1240 'masters': ['192.0.2.1', '192.0.2.2'],
1241 'catalog': 'catalog.invalid.',
1242 'soa_edit_api': 'EPOCH',
1245 self
.put_zone(name
, payload
)
1246 data
= self
.get_zone(name
)
1247 for k
in payload
.keys():
1248 self
.assertIn(k
, data
)
1249 self
.assertEqual(data
[k
], payload
[k
])
1250 # update, back to Native and empty(off)
1257 self
.put_zone(name
, payload
)
1258 data
= self
.get_zone(name
)
1259 for k
in payload
.keys():
1260 self
.assertIn(k
, data
)
1261 self
.assertEqual(data
[k
], payload
[k
])
1263 def test_zone_rr_update(self
):
1264 name
, payload
, zone
= self
.create_zone()
1265 # do a replace (= update)
1267 'changetype': 'replace',
1273 "content": "ns1.bar.com.",
1277 "content": "ns2-disabled.bar.com.",
1282 payload
= {'rrsets': [rrset
]}
1283 r
= self
.session
.patch(
1284 self
.url("/api/v1/servers/localhost/zones/" + name
),
1285 data
=json
.dumps(payload
),
1286 headers
={'content-type': 'application/json'})
1287 self
.assert_success(r
)
1288 # verify that (only) the new record is there
1289 data
= self
.get_zone(name
)
1290 self
.assertCountEqual(get_rrset(data
, name
, 'NS')['records'], rrset
['records'])
1292 def test_zone_rr_update_mx(self
):
1293 # Important to test with MX records, as they have a priority field, which must end up in the content field.
1294 name
, payload
, zone
= self
.create_zone()
1295 # do a replace (= update)
1297 'changetype': 'replace',
1303 "content": "10 mail.example.org.",
1308 payload
= {'rrsets': [rrset
]}
1309 r
= self
.session
.patch(
1310 self
.url("/api/v1/servers/localhost/zones/" + name
),
1311 data
=json
.dumps(payload
),
1312 headers
={'content-type': 'application/json'})
1313 self
.assert_success(r
)
1314 # verify that (only) the new record is there
1315 data
= self
.get_zone(name
)
1316 self
.assertEqual(get_rrset(data
, name
, 'MX')['records'], rrset
['records'])
1318 def test_zone_rr_update_invalid_mx(self
):
1319 name
, payload
, zone
= self
.create_zone()
1320 # do a replace (= update)
1322 'changetype': 'replace',
1328 "content": "10 mail@mx.example.org.",
1333 payload
= {'rrsets': [rrset
]}
1334 r
= self
.session
.patch(
1335 self
.url("/api/v1/servers/localhost/zones/" + name
),
1336 data
=json
.dumps(payload
),
1337 headers
={'content-type': 'application/json'})
1338 self
.assertEqual(r
.status_code
, 422)
1339 self
.assertIn('non-hostname content', r
.json()['error'])
1340 data
= self
.get_zone(name
)
1341 self
.assertIsNone(get_rrset(data
, name
, 'MX'))
1343 def test_zone_rr_update_opt(self
):
1344 name
, payload
, zone
= self
.create_zone()
1345 # do a replace (= update)
1347 'changetype': 'replace',
1358 payload
= {'rrsets': [rrset
]}
1359 r
= self
.session
.patch(
1360 self
.url("/api/v1/servers/localhost/zones/" + name
),
1361 data
=json
.dumps(payload
),
1362 headers
={'content-type': 'application/json'})
1363 self
.assertEqual(r
.status_code
, 422)
1364 self
.assertIn('OPT: invalid type given', r
.json()['error'])
1366 def test_zone_rr_update_multiple_rrsets(self
):
1367 name
, payload
, zone
= self
.create_zone()
1369 'changetype': 'replace',
1376 "content": "ns9999.example.com.",
1382 'changetype': 'replace',
1388 "content": "10 mx444.example.com.",
1393 payload
= {'rrsets': [rrset1
, rrset2
]}
1394 r
= self
.session
.patch(
1395 self
.url("/api/v1/servers/localhost/zones/" + name
),
1396 data
=json
.dumps(payload
),
1397 headers
={'content-type': 'application/json'})
1398 self
.assert_success(r
)
1399 # verify that all rrsets have been updated
1400 data
= self
.get_zone(name
)
1401 self
.assertEqual(get_rrset(data
, name
, 'NS')['records'], rrset1
['records'])
1402 self
.assertEqual(get_rrset(data
, name
, 'MX')['records'], rrset2
['records'])
1404 def test_zone_rr_update_duplicate_record(self
):
1405 name
, payload
, zone
= self
.create_zone()
1407 'changetype': 'replace',
1412 {"content": "ns9999.example.com.", "disabled": False},
1413 {"content": "ns9996.example.com.", "disabled": False},
1414 {"content": "ns9987.example.com.", "disabled": False},
1415 {"content": "ns9988.example.com.", "disabled": False},
1416 {"content": "ns9999.example.com.", "disabled": False},
1419 payload
= {'rrsets': [rrset
]}
1420 r
= self
.session
.patch(
1421 self
.url("/api/v1/servers/localhost/zones/" + name
),
1422 data
=json
.dumps(payload
),
1423 headers
={'content-type': 'application/json'})
1424 self
.assertEqual(r
.status_code
, 422)
1425 self
.assertIn('Duplicate record in RRset', r
.json()['error'])
1427 def test_zone_rr_update_duplicate_rrset(self
):
1428 name
, payload
, zone
= self
.create_zone()
1430 'changetype': 'replace',
1436 "content": "ns9999.example.com.",
1442 'changetype': 'replace',
1448 "content": "ns9998.example.com.",
1453 payload
= {'rrsets': [rrset1
, rrset2
]}
1454 r
= self
.session
.patch(
1455 self
.url("/api/v1/servers/localhost/zones/" + name
),
1456 data
=json
.dumps(payload
),
1457 headers
={'content-type': 'application/json'})
1458 self
.assertEqual(r
.status_code
, 422)
1459 self
.assertIn('Duplicate RRset', r
.json()['error'])
1461 def test_zone_rr_delete(self
):
1462 name
, payload
, zone
= self
.create_zone()
1463 # do a delete of all NS records (these are created with the zone)
1465 'changetype': 'delete',
1469 payload
= {'rrsets': [rrset
]}
1470 r
= self
.session
.patch(
1471 self
.url("/api/v1/servers/localhost/zones/" + name
),
1472 data
=json
.dumps(payload
),
1473 headers
={'content-type': 'application/json'})
1474 self
.assert_success(r
)
1475 # verify that the records are gone
1476 data
= self
.get_zone(name
)
1477 self
.assertIsNone(get_rrset(data
, name
, 'NS'))
1479 def test_zone_rr_update_rrset_combine_replace_and_delete(self
):
1480 name
, payload
, zone
= self
.create_zone()
1482 'changetype': 'delete',
1483 'name': 'sub.' + name
,
1487 'changetype': 'replace',
1488 'name': 'sub.' + name
,
1493 "content": "www.example.org.",
1498 payload
= {'rrsets': [rrset1
, rrset2
]}
1499 r
= self
.session
.patch(
1500 self
.url("/api/v1/servers/localhost/zones/" + name
),
1501 data
=json
.dumps(payload
),
1502 headers
={'content-type': 'application/json'})
1503 self
.assert_success(r
)
1504 # verify that (only) the new record is there
1505 data
= self
.get_zone(name
)
1506 self
.assertEqual(get_rrset(data
, 'sub.' + name
, 'CNAME')['records'], rrset2
['records'])
1508 def test_zone_disable_reenable(self
):
1509 # This also tests that SOA-EDIT-API works.
1510 name
, payload
, zone
= self
.create_zone(soa_edit_api
='EPOCH')
1511 # disable zone by disabling SOA
1513 'changetype': 'replace',
1519 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1524 payload
= {'rrsets': [rrset
]}
1525 r
= self
.session
.patch(
1526 self
.url("/api/v1/servers/localhost/zones/" + name
),
1527 data
=json
.dumps(payload
),
1528 headers
={'content-type': 'application/json'})
1529 self
.assert_success(r
)
1530 # check SOA serial has been edited
1531 data
= self
.get_zone(name
)
1532 soa_serial1
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1533 self
.assertNotEqual(soa_serial1
, '1')
1534 # make sure domain is still in zone list (disabled SOA!)
1535 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
1537 self
.assertEqual(len([domain
for domain
in domains
if domain
['name'] == name
]), 1)
1538 # sleep 1sec to ensure the EPOCH value changes for the next request
1540 # verify that modifying it still works
1541 rrset
['records'][0]['disabled'] = False
1542 payload
= {'rrsets': [rrset
]}
1543 r
= self
.session
.patch(
1544 self
.url("/api/v1/servers/localhost/zones/" + name
),
1545 data
=json
.dumps(payload
),
1546 headers
={'content-type': 'application/json'})
1547 self
.assert_success(r
)
1548 # check SOA serial has been edited again
1549 data
= self
.get_zone(name
)
1550 soa_serial2
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1551 self
.assertNotEqual(soa_serial2
, '1')
1552 self
.assertNotEqual(soa_serial2
, soa_serial1
)
1554 def test_zone_rr_update_out_of_zone(self
):
1555 name
, payload
, zone
= self
.create_zone()
1556 # replace with qname mismatch
1558 'changetype': 'replace',
1559 'name': 'not-in-zone.',
1564 "content": "ns1.bar.com.",
1569 payload
= {'rrsets': [rrset
]}
1570 r
= self
.session
.patch(
1571 self
.url("/api/v1/servers/localhost/zones/" + name
),
1572 data
=json
.dumps(payload
),
1573 headers
={'content-type': 'application/json'})
1574 self
.assertEqual(r
.status_code
, 422)
1575 self
.assertIn('out of zone', r
.json()['error'])
1577 def test_zone_rr_update_restricted_chars(self
):
1578 name
, payload
, zone
= self
.create_zone()
1579 # replace with qname mismatch
1581 'changetype': 'replace',
1582 'name': 'test:' + name
,
1587 "content": "ns1.bar.com.",
1592 payload
= {'rrsets': [rrset
]}
1593 r
= self
.session
.patch(
1594 self
.url("/api/v1/servers/localhost/zones/" + name
),
1595 data
=json
.dumps(payload
),
1596 headers
={'content-type': 'application/json'})
1597 self
.assertEqual(r
.status_code
, 422)
1598 self
.assertIn('contains unsupported characters', r
.json()['error'])
1600 def test_rrset_unknown_type(self
):
1601 name
, payload
, zone
= self
.create_zone()
1603 'changetype': 'replace',
1609 "content": "4.3.2.1",
1614 payload
= {'rrsets': [rrset
]}
1615 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1616 headers
={'content-type': 'application/json'})
1617 self
.assertEqual(r
.status_code
, 422)
1618 self
.assertIn('unknown type', r
.json()['error'])
1620 @parameterized.expand([
1623 def test_rrset_exclusive_and_other(self
, qtype
):
1624 name
, payload
, zone
= self
.create_zone()
1626 'changetype': 'replace',
1632 "content": "example.org.",
1637 payload
= {'rrsets': [rrset
]}
1638 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1639 headers
={'content-type': 'application/json'})
1640 self
.assertEqual(r
.status_code
, 422)
1641 self
.assertIn('Conflicts with pre-existing RRset', r
.json()['error'])
1643 @parameterized.expand([
1646 def test_rrset_other_and_exclusive(self
, qtype
):
1647 name
, payload
, zone
= self
.create_zone()
1649 'changetype': 'replace',
1650 'name': 'sub.'+name
,
1655 "content": "example.org.",
1660 payload
= {'rrsets': [rrset
]}
1661 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1662 headers
={'content-type': 'application/json'})
1663 self
.assert_success(r
)
1665 'changetype': 'replace',
1666 'name': 'sub.'+name
,
1671 "content": "1.2.3.4",
1676 payload
= {'rrsets': [rrset
]}
1677 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1678 headers
={'content-type': 'application/json'})
1679 self
.assertEqual(r
.status_code
, 422)
1680 self
.assertIn('Conflicts with pre-existing RRset', r
.json()['error'])
1682 @parameterized.expand([
1683 ('', 'SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
1684 ('sub.', 'CNAME', ['01.example.org.', '02.example.org.']),
1686 def test_rrset_single_qtypes(self
, label
, qtype
, contents
):
1687 name
, payload
, zone
= self
.create_zone()
1689 'changetype': 'replace',
1690 'name': label
+ name
,
1695 "content": contents
[0],
1699 "content": contents
[1],
1704 payload
= {'rrsets': [rrset
]}
1705 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1706 headers
={'content-type': 'application/json'})
1707 self
.assertEqual(r
.status_code
, 422)
1708 self
.assertIn('IN ' + qtype
+ ' has more than one record', r
.json()['error'])
1710 def test_rrset_zone_apex(self
):
1711 name
, payload
, zone
= self
.create_zone()
1713 'changetype': 'replace',
1719 "content": 'ns1.example.org. test@example.org. 10 10800 3600 604800 3600',
1725 'changetype': 'replace',
1731 "content": 'example.com.',
1737 payload
= {'rrsets': [rrset1
, rrset2
]}
1738 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1739 headers
={'content-type': 'application/json'})
1740 self
.assert_success(r
) # user should be able to create DNAME at APEX as per RFC 6672 section 2.3
1742 @parameterized.expand([
1743 ('SOA', 'ns1.example.org. test@example.org. 10 10800 3600 604800 1800'),
1744 ('DNSKEY', '257 3 8 AwEAAb/+pXOZWYQ8mv9WM5dFva8WU9jcIUdDuEjldbyfnkQ/xlrJC5zAEfhYhrea3SmIPmMTDimLqbh3/4SMTNPTUF+9+U1vpNfIRTFadqsmuU9Fddz3JqCcYwEpWbReg6DJOeyu+9oBoIQkPxFyLtIXEPGlQzrynKubn04Cx83I6NfzDTraJT3jLHKeW5PVc1ifqKzHz5TXdHHTA7NkJAa0sPcZCoNE1LpnJI/wcUpRUiuQhoLFeT1E432GuPuZ7y+agElGj0NnBxEgnHrhrnZWUbULpRa/il+Cr5Taj988HqX9Xdm6FjcP4Lbuds/44U7U8du224Q8jTrZ57Yvj4VDQKc='),
1746 def test_only_at_apex(self
, qtype
, content
):
1747 name
, payload
, zone
= self
.create_zone(soa_edit_api
='')
1749 'changetype': 'replace',
1760 payload
= {'rrsets': [rrset
]}
1761 r
= self
.session
.patch(
1762 self
.url("/api/v1/servers/localhost/zones/" + name
),
1763 data
=json
.dumps(payload
),
1764 headers
={'content-type': 'application/json'})
1765 self
.assert_success(r
)
1766 # verify that the new record is there
1767 data
= self
.get_zone(name
)
1768 self
.assertEqual(get_rrset(data
, name
, qtype
)['records'], rrset
['records'])
1771 'changetype': 'replace',
1772 'name': 'sub.' + name
,
1782 payload
= {'rrsets': [rrset
]}
1783 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1784 headers
={'content-type': 'application/json'})
1785 self
.assertEqual(r
.status_code
, 422)
1786 self
.assertIn('only allowed at apex', r
.json()['error'])
1787 data
= self
.get_zone(name
)
1788 self
.assertIsNone(get_rrset(data
, 'sub.' + name
, qtype
))
1790 @parameterized.expand([
1791 ('DS', '44030 8 2 d4c3d5552b8679faeebc317e5f048b614b2e5f607dc57f1553182d49ab2179f7'),
1793 def test_not_allowed_at_apex(self
, qtype
, content
):
1794 name
, payload
, zone
= self
.create_zone()
1796 'changetype': 'replace',
1807 payload
= {'rrsets': [rrset
]}
1808 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1809 headers
={'content-type': 'application/json'})
1810 self
.assertEqual(r
.status_code
, 422)
1811 self
.assertIn('not allowed at apex', r
.json()['error'])
1812 data
= self
.get_zone(name
)
1813 self
.assertIsNone(get_rrset(data
, 'sub.' + name
, qtype
))
1816 'changetype': 'replace',
1817 'name': 'sub.' + name
,
1827 payload
= {'rrsets': [rrset
]}
1828 r
= self
.session
.patch(
1829 self
.url("/api/v1/servers/localhost/zones/" + name
),
1830 data
=json
.dumps(payload
),
1831 headers
={'content-type': 'application/json'})
1832 self
.assert_success(r
)
1833 # verify that the new record is there
1834 data
= self
.get_zone(name
)
1835 self
.assertEqual(get_rrset(data
, 'sub.' + name
, qtype
)['records'], rrset
['records'])
1837 def test_rr_svcb(self
):
1838 name
, payload
, zone
= self
.create_zone()
1840 'changetype': 'replace',
1841 'name': 'svcb.' + name
,
1846 "content": '40 . mandatory=alpn alpn=h2,h3 ipv4hint=192.0.2.1,192.0.2.2 ech="dG90YWxseSBib2d1cyBlY2hjb25maWcgdmFsdWU="',
1851 payload
= {'rrsets': [rrset
]}
1852 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1853 headers
={'content-type': 'application/json'})
1854 self
.assert_success(r
)
1856 def test_rrset_ns_dname_exclude(self
):
1857 name
, payload
, zone
= self
.create_zone()
1859 'changetype': 'replace',
1860 'name': 'delegation.'+name
,
1865 "content": "ns.example.org.",
1870 payload
= {'rrsets': [rrset
]}
1871 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1872 headers
={'content-type': 'application/json'})
1873 self
.assert_success(r
)
1875 'changetype': 'replace',
1876 'name': 'delegation.'+name
,
1881 "content": "example.com.",
1886 payload
= {'rrsets': [rrset
]}
1887 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1888 headers
={'content-type': 'application/json'})
1889 self
.assertEqual(r
.status_code
, 422)
1890 self
.assertIn('Cannot have both NS and DNAME except in zone apex', r
.json()['error'])
1892 ## FIXME: Enable this when it's time for it
1893 # def test_rrset_dname_nothing_under(self):
1894 # name, payload, zone = self.create_zone()
1896 # 'changetype': 'replace',
1897 # 'name': 'delegation.'+name,
1902 # "content": "example.com.",
1907 # payload = {'rrsets': [rrset]}
1908 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1909 # headers={'content-type': 'application/json'})
1910 # self.assert_success(r)
1912 # 'changetype': 'replace',
1913 # 'name': 'sub.delegation.'+name,
1918 # "content": "1.2.3.4",
1923 # payload = {'rrsets': [rrset]}
1924 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1925 # headers={'content-type': 'application/json'})
1926 # self.assertEqual(r.status_code, 422)
1927 # self.assertIn('You cannot have record(s) under CNAME/DNAME', r.json()['error'])
1929 def test_create_zone_with_leading_space(self
):
1930 # Actual regression.
1931 name
, payload
, zone
= self
.create_zone()
1933 'changetype': 'replace',
1939 "content": " 4.3.2.1",
1944 payload
= {'rrsets': [rrset
]}
1945 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1946 headers
={'content-type': 'application/json'})
1947 self
.assertEqual(r
.status_code
, 422)
1948 self
.assertIn('Not in expected format', r
.json()['error'])
1950 @unittest.skipIf(is_auth_lmdb(), "No out-of-zone storage in LMDB")
1951 def test_zone_rr_delete_out_of_zone(self
):
1952 name
, payload
, zone
= self
.create_zone()
1954 'changetype': 'delete',
1955 'name': 'not-in-zone.',
1958 payload
= {'rrsets': [rrset
]}
1959 r
= self
.session
.patch(
1960 self
.url("/api/v1/servers/localhost/zones/" + name
),
1961 data
=json
.dumps(payload
),
1962 headers
={'content-type': 'application/json'})
1964 self
.assert_success(r
) # succeed so users can fix their wrong, old data
1966 def test_zone_delete(self
):
1967 name
, payload
, zone
= self
.create_zone()
1968 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
1969 self
.assertEqual(r
.status_code
, 204)
1970 self
.assertNotIn('Content-Type', r
.headers
)
1972 def test_zone_comment_create(self
):
1973 name
, payload
, zone
= self
.create_zone()
1975 'changetype': 'replace',
1982 'content': 'blah blah',
1986 'content': 'blah blah bleh',
1991 'changetype': 'replace',
1998 'content': 'this should not show up later'
2002 payload
= {'rrsets': [rrset1
, rrset2
]}
2003 r
= self
.session
.patch(
2004 self
.url("/api/v1/servers/localhost/zones/" + name
),
2005 data
=json
.dumps(payload
),
2006 headers
={'content-type': 'application/json'})
2008 self
.assert_error_json(r
) # No comments in LMDB
2011 self
.assert_success(r
)
2012 # make sure the comments have been set, and that the NS
2013 # records are still present
2014 data
= self
.get_zone(name
, rrset_name
=name
, rrset_type
="NS")
2015 serverset
= get_rrset(data
, name
, 'NS')
2017 self
.assertNotEqual(serverset
['records'], [])
2018 self
.assertNotEqual(serverset
['comments'], [])
2019 # verify that modified_at has been set by pdns
2020 self
.assertNotEqual([c
for c
in serverset
['comments']][0]['modified_at'], 0)
2021 # verify that unrelated comments do not leak into the result
2022 self
.assertEqual(get_rrset(data
, name
, 'SOA'), None)
2023 # verify that TTL is correct (regression test)
2024 self
.assertEqual(serverset
['ttl'], 3600)
2026 def test_zone_comment_delete(self
):
2027 # Test: Delete ONLY comments.
2028 name
, payload
, zone
= self
.create_zone()
2030 'changetype': 'replace',
2035 payload
= {'rrsets': [rrset
]}
2036 r
= self
.session
.patch(
2037 self
.url("/api/v1/servers/localhost/zones/" + name
),
2038 data
=json
.dumps(payload
),
2039 headers
={'content-type': 'application/json'})
2040 self
.assert_success(r
)
2041 # make sure the NS records are still present
2042 data
= self
.get_zone(name
)
2043 serverset
= get_rrset(data
, name
, 'NS')
2045 self
.assertNotEqual(serverset
['records'], [])
2046 self
.assertEqual(serverset
['comments'], [])
2048 @unittest.skipIf(is_auth_lmdb(), "No comments in LMDB")
2049 def test_zone_comment_out_of_range_modified_at(self
):
2050 # Test if a modified_at outside of the 32 bit range throws an error
2051 name
, payload
, zone
= self
.create_zone()
2053 'changetype': 'replace',
2059 'content': 'oh hi there',
2060 'modified_at': '4294967297'
2064 payload
= {'rrsets': [rrset
]}
2065 r
= self
.session
.patch(
2066 self
.url("/api/v1/servers/localhost/zones/" + name
),
2067 data
=json
.dumps(payload
),
2068 headers
={'content-type': 'application/json'})
2069 self
.assertEqual(r
.status_code
, 422)
2070 self
.assertIn("Key 'modified_at' is out of range", r
.json()['error'])
2072 @unittest.skipIf(is_auth_lmdb(), "No comments in LMDB")
2073 def test_zone_comment_stay_intact(self
):
2074 # Test if comments on an rrset stay intact if the rrset is replaced
2075 name
, payload
, zone
= self
.create_zone()
2078 'changetype': 'replace',
2084 'content': 'oh hi there',
2089 payload
= {'rrsets': [rrset
]}
2090 r
= self
.session
.patch(
2091 self
.url("/api/v1/servers/localhost/zones/" + name
),
2092 data
=json
.dumps(payload
),
2093 headers
={'content-type': 'application/json'})
2094 self
.assert_success(r
)
2095 # replace rrset records
2097 'changetype': 'replace',
2103 "content": "ns1.bar.com.",
2108 payload2
= {'rrsets': [rrset2
]}
2109 r
= self
.session
.patch(
2110 self
.url("/api/v1/servers/localhost/zones/" + name
),
2111 data
=json
.dumps(payload2
),
2112 headers
={'content-type': 'application/json'})
2113 self
.assert_success(r
)
2114 # make sure the comments still exist
2115 data
= self
.get_zone(name
)
2116 serverset
= get_rrset(data
, name
, 'NS')
2118 self
.assertEqual(serverset
['records'], rrset2
['records'])
2119 self
.assertEqual(serverset
['comments'], rrset
['comments'])
2121 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2122 def test_search_rr_exact_zone(self
):
2123 name
= unique_zone_name()
2124 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
2125 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.')))
2126 self
.assert_success_json(r
)
2128 self
.assertCountEqual(r
.json(), [
2129 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
2130 {u
'content': u
'ns1.example.com.',
2131 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2132 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
2133 {u
'content': u
'ns2.example.com.',
2134 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2135 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
2136 {u
'content': u
'a.misconfigured.dns.server.invalid. hostmaster.'+name
+' 22 10800 3600 604800 3600',
2137 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2138 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
2141 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2142 def test_search_rr_exact_zone_filter_type_zone(self
):
2143 name
= unique_zone_name()
2145 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
2146 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.') + "&object_type=" + data_type
))
2147 self
.assert_success_json(r
)
2149 self
.assertEqual(r
.json(), [
2150 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
2153 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2154 def test_search_rr_exact_zone_filter_type_record(self
):
2155 name
= unique_zone_name()
2156 data_type
= "record"
2157 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
2158 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.') + "&object_type=" + data_type
))
2159 self
.assert_success_json(r
)
2161 self
.assertCountEqual(r
.json(), [
2162 {u
'content': u
'ns1.example.com.',
2163 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2164 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
2165 {u
'content': u
'ns2.example.com.',
2166 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2167 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
2168 {u
'content': u
'a.misconfigured.dns.server.invalid. hostmaster.'+name
+' 22 10800 3600 604800 3600',
2169 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2170 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
2173 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2174 def test_search_rr_substring(self
):
2175 name
= unique_zone_name()
2177 self
.create_zone(name
=name
)
2178 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
2179 self
.assert_success_json(r
)
2181 # should return zone, SOA, ns1, ns2
2182 self
.assertEqual(len(r
.json()), 4)
2184 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2185 def test_search_rr_case_insensitive(self
):
2186 name
= unique_zone_name()+'testsuffix.'
2187 self
.create_zone(name
=name
)
2188 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
2189 self
.assert_success_json(r
)
2191 # should return zone, SOA, ns1, ns2
2192 self
.assertEqual(len(r
.json()), 4)
2194 @unittest.skipIf(is_auth_lmdb(), "No search or comments in LMDB")
2195 def test_search_rr_comment(self
):
2196 name
= unique_zone_name()
2202 "content": "2001:DB8::1",
2206 "account": "test AAAA",
2208 "modified_at": 11112,
2211 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=rrsets
)
2212 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=blah"))
2213 self
.assert_success_json(r
)
2215 # should return the AAAA record
2216 self
.assertEqual(len(data
), 1)
2217 self
.assertEqual(data
[0]['object_type'], 'comment')
2218 self
.assertEqual(data
[0]['type'], 'AAAA')
2219 self
.assertEqual(data
[0]['name'], name
)
2220 self
.assertEqual(data
[0]['content'], rrsets
[0]['comments'][0]['content'])
2222 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2223 def test_search_after_rectify_with_ent(self
):
2224 name
= unique_zone_name()
2225 search
= name
.split('.')[0]
2227 "name": 'sub.sub.' + name
,
2231 "content": "4.3.2.1",
2235 self
.create_zone(name
=name
, rrsets
=[rrset
])
2236 pdnsutil_rectify(name
)
2237 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
2238 self
.assert_success_json(r
)
2240 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
2241 self
.assertEqual(len(r
.json()), 5)
2243 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2244 def test_default_api_rectify_dnssec(self
):
2245 name
= unique_zone_name()
2248 "name": 'a.' + name
,
2252 "content": "2001:DB8::1",
2257 "name": 'b.' + name
,
2261 "content": "2001:DB8::2",
2266 self
.create_zone(name
=name
, rrsets
=rrsets
, dnssec
=True, nsec3param
='1 0 1 ab')
2267 dbrecs
= get_db_records(name
, 'AAAA')
2268 self
.assertIsNotNone(dbrecs
[0]['ordername'])
2270 def test_default_api_rectify_nodnssec(self
):
2271 """Without any DNSSEC settings, rectify should still add ENTs. Setup the zone
2272 so ENTs are necessary, and check for their existence using sdig.
2274 name
= unique_zone_name()
2277 "name": 'a.sub.' + name
,
2281 "content": "2001:DB8::1",
2286 "name": 'b.sub.' + name
,
2290 "content": "2001:DB8::2",
2295 self
.create_zone(name
=name
, rrsets
=rrsets
)
2296 # default-api-rectify is yes (by default). expect rectify to have happened.
2297 assert 'Rcode: 0 ' in sdig('sub.' + name
, 'TXT')
2299 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2300 def test_override_api_rectify(self
):
2301 name
= unique_zone_name()
2302 search
= name
.split('.')[0]
2305 "name": 'a.' + name
,
2309 "content": "2001:DB8::1",
2314 "name": 'b.' + name
,
2318 "content": "2001:DB8::2",
2323 self
.create_zone(name
=name
, rrsets
=rrsets
, api_rectify
=False, dnssec
=True, nsec3param
='1 0 1 ab')
2324 dbrecs
= get_db_records(name
, 'AAAA')
2325 self
.assertIsNone(dbrecs
[0]['ordername'])
2327 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2328 def test_explicit_rectify_success(self
):
2329 name
, _
, data
= self
.create_zone
= self
.create_zone(api_rectify
=False, dnssec
=True, nsec3param
='1 0 1 ab')
2330 dbrecs
= get_db_records(name
, 'SOA')
2331 self
.assertIsNone(dbrecs
[0]['ordername'])
2332 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/rectify"))
2333 self
.assertEqual(r
.status_code
, 200)
2334 dbrecs
= get_db_records(name
, 'SOA')
2335 self
.assertIsNotNone(dbrecs
[0]['ordername'])
2337 def test_explicit_rectify_slave(self
):
2338 # Some users want to move a zone to kind=Slave and then rectify, without a re-transfer.
2339 name
, _
, data
= self
.create_zone
= self
.create_zone(api_rectify
=False, dnssec
=True, nsec3param
='1 0 1 ab')
2340 self
.put_zone(data
['id'], {'kind': 'Slave'})
2341 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/rectify"))
2342 self
.assertEqual(r
.status_code
, 200)
2343 if not is_auth_lmdb():
2344 dbrecs
= get_db_records(name
, 'SOA')
2345 self
.assertIsNotNone(dbrecs
[0]['ordername'])
2347 def test_cname_at_ent_place(self
):
2348 name
, payload
, zone
= self
.create_zone(dnssec
=True, api_rectify
=True)
2350 'changetype': 'replace',
2351 'name': 'sub2.sub1.' + name
,
2355 'content': "4.3.2.1",
2359 payload
= {'rrsets': [rrset
]}
2360 r
= self
.session
.patch(
2361 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
2362 data
=json
.dumps(payload
),
2363 headers
={'content-type': 'application/json'})
2364 self
.assertEqual(r
.status_code
, 204)
2366 'changetype': 'replace',
2367 'name': 'sub1.' + name
,
2371 'content': "www.example.org.",
2375 payload
= {'rrsets': [rrset
]}
2376 r
= self
.session
.patch(
2377 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
2378 data
=json
.dumps(payload
),
2379 headers
={'content-type': 'application/json'})
2380 self
.assertEqual(r
.status_code
, 204)
2382 def test_rrset_parameter_post_false(self
):
2383 name
= unique_zone_name()
2387 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
2389 r
= self
.session
.post(
2390 self
.url("/api/v1/servers/localhost/zones?rrsets=false"),
2391 data
=json
.dumps(payload
),
2392 headers
={'content-type': 'application/json'})
2394 self
.assert_success_json(r
)
2395 self
.assertEqual(r
.status_code
, 201)
2396 self
.assertEqual(r
.json().get('rrsets'), None)
2398 def test_rrset_false_parameter(self
):
2399 name
= unique_zone_name()
2400 self
.create_zone(name
=name
, kind
='Native')
2401 data
= self
.get_zone(name
, rrsets
="false")
2402 self
.assertEqual(data
.get('rrsets'), None)
2404 def test_rrset_true_parameter(self
):
2405 name
= unique_zone_name()
2406 self
.create_zone(name
=name
, kind
='Native')
2407 data
= self
.get_zone(name
, rrsets
="true")
2408 self
.assertEqual(len(data
['rrsets']), 2)
2410 def test_wrong_rrset_parameter(self
):
2411 name
= unique_zone_name()
2412 self
.create_zone(name
=name
, kind
='Native')
2414 name
, rrsets
="foobar",
2415 expect_error
="'rrsets' request parameter value 'foobar' is not supported"
2418 def test_put_master_tsig_key_ids_non_existent(self
):
2419 name
= unique_zone_name()
2420 keyname
= unique_zone_name().split('.')[0]
2421 self
.create_zone(name
=name
, kind
='Native')
2423 'master_tsig_key_ids': [keyname
]
2425 self
.put_zone(name
, payload
, expect_error
='A TSIG key with the name')
2427 def test_put_slave_tsig_key_ids_non_existent(self
):
2428 name
= unique_zone_name()
2429 keyname
= unique_zone_name().split('.')[0]
2430 self
.create_zone(name
=name
, kind
='Native')
2432 'slave_tsig_key_ids': [keyname
]
2434 self
.put_zone(name
, payload
, expect_error
='A TSIG key with the name')
2436 def test_zone_replace_rrsets_basic(self
):
2437 """Basic test: all automatic modification is off, on replace the new rrsets are ingested as is."""
2438 name
, _
, _
= self
.create_zone(dnssec
=False, soa_edit
='', soa_edit_api
='')
2440 {'name': name
, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2441 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2442 {'name': 'www.' + name
, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2443 {'name': 'sub.' + name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}]},
2445 self
.put_zone(name
, {'rrsets': rrsets
})
2447 data
= self
.get_zone(name
)
2448 for rrset
in rrsets
:
2449 rrset
.setdefault('comments', [])
2450 for record
in rrset
['records']:
2451 record
.setdefault('disabled', False)
2452 assert_eq_rrsets(data
['rrsets'], rrsets
)
2454 def test_zone_replace_rrsets_dnssec(self
):
2455 """With dnssec: check automatic rectify is done"""
2456 name
, _
, _
= self
.create_zone(dnssec
=True)
2458 {'name': name
, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2459 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2460 {'name': 'www.' + name
, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2462 self
.put_zone(name
, {'rrsets': rrsets
})
2464 if not is_auth_lmdb():
2465 # lmdb: skip, no get_db_records implementations
2466 dbrecs
= get_db_records(name
, 'A')
2467 assert dbrecs
[0]['ordername'] is not None # default = rectify enabled
2469 def test_zone_replace_rrsets_with_soa_edit(self
):
2470 """SOA-EDIT was enabled before rrsets will be replaced"""
2471 name
, _
, _
= self
.create_zone(soa_edit
='INCEPTION-INCREMENT', soa_edit_api
='SOA-EDIT-INCREASE')
2473 {'name': name
, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2474 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2475 {'name': 'www.' + name
, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2476 {'name': 'sub.' + name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}]},
2478 self
.put_zone(name
, {'rrsets': rrsets
})
2480 data
= self
.get_zone(name
)
2481 soa
= [rrset
['records'][0]['content'] for rrset
in data
['rrsets'] if rrset
['type'] == 'SOA'][0]
2482 assert int(soa
.split()[2]) > 1 # serial is larger than what we sent
2484 def test_zone_replace_rrsets_no_soa_primary(self
):
2485 """Replace all RRsets but supply no SOA. Should fail."""
2486 name
, _
, _
= self
.create_zone()
2488 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]}
2490 self
.put_zone(name
, {'rrsets': rrsets
}, expect_error
='Must give SOA record for zone when replacing all RR sets')
2492 @parameterized.expand([
2495 {'name': '$NAME$', 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2498 def test_zone_replace_rrsets_secondary(self
, expected_error
, rrsets
):
2500 Replace all RRsets in a SECONDARY zone.
2502 If no SOA is given, this should still succeed, also setting zone stale (but cannot assert this here).
2504 name
, _
, _
= self
.create_zone(kind
='Secondary', nameservers
=None, masters
=['127.0.0.2'])
2505 self
.put_zone(name
, {'rrsets': templated_rrsets(rrsets
, name
)}, expect_error
=expected_error
)
2507 @parameterized.expand([
2509 ("Modifying RRsets in Consumer zones is unsupported", [
2510 {'name': '$NAME$', 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2513 def test_zone_replace_rrsets_consumer(self
, expected_error
, rrsets
):
2514 name
, _
, _
= self
.create_zone(kind
='Consumer', nameservers
=None, masters
=['127.0.0.2'])
2515 self
.put_zone(name
, {'rrsets': templated_rrsets(rrsets
, name
)}, expect_error
=expected_error
)
2517 def test_zone_replace_rrsets_negative_ttl(self
):
2518 name
, _
, _
= self
.create_zone(dnssec
=False, soa_edit
='', soa_edit_api
='')
2520 {'name': name
, 'type': 'SOA', 'ttl': -1, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2522 self
.put_zone(name
, {'rrsets': rrsets
}, expect_error
="Key 'ttl' is not a positive Integer")
2524 @parameterized.expand([
2525 ("IN MX: non-hostname content", [{'name': '$NAME$', 'type': 'MX', 'ttl': 3600, 'records': [{"content": "10 mail@mx.example.org."}]}]),
2526 ("out of zone", [{'name': 'not-in-zone.', 'type': 'NS', 'ttl': 3600, 'records': [{"content": "ns1.example.org."}]}]),
2527 ("contains unsupported characters", [{'name': 'test:.$NAME$', 'type': 'NS', 'ttl': 3600, 'records': [{"content": "ns1.example.org."}]}]),
2528 ("unknown type", [{'name': '$NAME$', 'type': 'INVALID', 'ttl': 3600, 'records': [{"content": "192.0.2.1"}]}]),
2529 ("Conflicts with another RRset", [{'name': '$NAME$', 'type': 'CNAME', 'ttl': 3600, 'records': [{"content": "example.org."}]}]),
2531 def test_zone_replace_rrsets_invalid(self
, expected_error
, invalid_rrsets
):
2532 """Test validation of RRsets before replacing them"""
2533 name
, _
, _
= self
.create_zone(dnssec
=False, soa_edit
='', soa_edit_api
='')
2535 {'name': name
, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2536 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2538 rrsets
= base_rrsets
+ templated_rrsets(invalid_rrsets
, name
)
2539 self
.put_zone(name
, {'rrsets': rrsets
}, expect_error
=expected_error
)
2542 @unittest.skipIf(not is_auth(), "Not applicable")
2543 class AuthRootZone(ApiTestCase
, AuthZonesHelperMixin
):
2546 super(AuthRootZone
, self
).setUp()
2547 # zone name is not unique, so delete the zone before each individual test.
2548 self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/=2E"))
2550 def test_create_zone(self
):
2551 name
, payload
, data
= self
.create_zone(name
='.', serial
=22, soa_edit_api
='')
2552 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
2553 self
.assertIn(k
, data
)
2555 self
.assertEqual(data
[k
], payload
[k
])
2556 # validate generated SOA
2557 rec
= get_first_rec(data
, '.', 'SOA')
2560 "a.misconfigured.dns.server.invalid. hostmaster. " + str(payload
['serial']) +
2561 " 10800 3600 604800 3600"
2563 # Regression test: verify zone list works
2564 zonelist
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones")).json()
2565 print("zonelist:", zonelist
)
2566 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
2567 # Also test that fetching the zone works.
2568 print("id:", data
['id'])
2569 self
.assertEqual(data
['id'], '=2E')
2570 data
= self
.get_zone(data
['id'])
2571 print("zone (fetched):", data
)
2572 for k
in ('name', 'kind'):
2573 self
.assertIn(k
, data
)
2574 self
.assertEqual(data
[k
], payload
[k
])
2575 self
.assertEqual(data
['rrsets'][0]['name'], '.')
2577 def test_update_zone(self
):
2578 name
, payload
, zone
= self
.create_zone(name
='.')
2580 # update, set as Master and enable SOA-EDIT-API
2583 'masters': ['192.0.2.1', '192.0.2.2'],
2584 'soa_edit_api': 'EPOCH',
2587 self
.put_zone(zone_id
, payload
)
2588 data
= self
.get_zone(zone_id
)
2589 for k
in payload
.keys():
2590 self
.assertIn(k
, data
)
2591 self
.assertEqual(data
[k
], payload
[k
])
2592 # update, back to Native and empty(off)
2598 self
.put_zone(zone_id
, payload
)
2599 data
= self
.get_zone(zone_id
)
2600 for k
in payload
.keys():
2601 self
.assertIn(k
, data
)
2602 self
.assertEqual(data
[k
], payload
[k
])
2605 @unittest.skipIf(not is_recursor(), "Not applicable")
2606 class RecursorZones(ApiTestCase
):
2608 def create_zone(self
, name
=None, kind
=None, rd
=False, servers
=None, notify_allowed
=False):
2610 name
= unique_zone_name()
2617 'recursion_desired': rd
,
2618 'notify_allowed': notify_allowed
2620 r
= self
.session
.post(
2621 self
.url("/api/v1/servers/localhost/zones"),
2622 data
=json
.dumps(payload
),
2623 headers
={'content-type': 'application/json'})
2624 self
.assert_success_json(r
)
2625 return payload
, r
.json()
2627 def test_create_auth_zone(self
):
2628 payload
, data
= self
.create_zone(kind
='Native')
2629 for k
in payload
.keys():
2630 self
.assertEqual(data
[k
], payload
[k
])
2632 def test_create_zone_no_name(self
):
2636 'servers': ['8.8.8.8'],
2637 'recursion_desired': False,
2640 r
= self
.session
.post(
2641 self
.url("/api/v1/servers/localhost/zones"),
2642 data
=json
.dumps(payload
),
2643 headers
={'content-type': 'application/json'})
2644 self
.assertEqual(r
.status_code
, 422)
2645 self
.assertIn('is not canonical', r
.json()['error'])
2647 def test_create_forwarded_zone(self
):
2648 payload
, data
= self
.create_zone(kind
='Forwarded', rd
=False, servers
=['8.8.8.8'])
2649 # return values are normalized
2650 payload
['servers'][0] += ':53'
2651 for k
in payload
.keys():
2652 self
.assertEqual(data
[k
], payload
[k
])
2654 def test_create_forwarded_zone_notify_allowed(self
):
2655 payload
, data
= self
.create_zone(kind
='Forwarded', rd
=False, servers
=['8.8.8.8'], notify_allowed
=True)
2656 # return values are normalized
2657 payload
['servers'][0] += ':53'
2658 for k
in payload
.keys():
2659 self
.assertEqual(data
[k
], payload
[k
])
2661 def test_create_forwarded_rd_zone(self
):
2662 payload
, data
= self
.create_zone(name
='google.com.', kind
='Forwarded', rd
=True, servers
=['8.8.8.8'])
2663 # return values are normalized
2664 payload
['servers'][0] += ':53'
2665 for k
in payload
.keys():
2666 self
.assertEqual(data
[k
], payload
[k
])
2668 def test_create_auth_zone_with_symbols(self
):
2669 payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name(), kind
='Native')
2670 expected_id
= (payload
['name'].replace('/', '=2F'))
2671 for k
in payload
.keys():
2672 self
.assertEqual(data
[k
], payload
[k
])
2673 self
.assertEqual(data
['id'], expected_id
)
2675 def test_rename_auth_zone(self
):
2676 payload
, data
= self
.create_zone(kind
='Native')
2677 name
= payload
['name']
2680 'name': 'renamed-'+name
,
2682 'recursion_desired': False
2684 r
= self
.session
.put(
2685 self
.url("/api/v1/servers/localhost/zones/" + name
),
2686 data
=json
.dumps(payload
),
2687 headers
={'content-type': 'application/json'})
2688 self
.assert_success(r
)
2689 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + payload
['name'])).json()
2690 for k
in payload
.keys():
2691 self
.assertEqual(data
[k
], payload
[k
])
2693 def test_zone_delete(self
):
2694 payload
, zone
= self
.create_zone(kind
='Native')
2695 name
= payload
['name']
2696 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
2697 self
.assertEqual(r
.status_code
, 204)
2698 self
.assertNotIn('Content-Type', r
.headers
)
2700 def test_search_rr_exact_zone(self
):
2701 name
= unique_zone_name()
2702 self
.create_zone(name
=name
, kind
='Native')
2703 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
))
2704 self
.assert_success_json(r
)
2706 self
.assertEqual(r
.json(), [{u
'type': u
'zone', u
'name': name
, u
'zone_id': name
}])
2708 def test_search_rr_substring(self
):
2709 name
= 'search-rr-zone.name.'
2710 self
.create_zone(name
=name
, kind
='Native')
2711 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2712 self
.assert_success_json(r
)
2714 # should return zone, SOA
2715 self
.assertEqual(len(r
.json()), 2)
2717 @unittest.skipIf(not is_auth(), "Not applicable")
2718 class AuthZoneKeys(ApiTestCase
, AuthZonesHelperMixin
):
2720 def test_get_keys(self
):
2721 r
= self
.session
.get(
2722 self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2723 self
.assert_success_json(r
)
2725 self
.assertGreater(len(keys
), 0)
2727 key0
= deepcopy(keys
[0])
2731 u
'algorithm': u
'ECDSAP256SHA256',
2734 u
'type': u
'Cryptokey',
2739 self
.assertEqual(key0
, expected
)
2741 keydata
= keys
[0]['dnskey'].split()
2742 self
.assertEqual(len(keydata
), 4)
2744 def test_get_keys_with_cds(self
):
2745 payload_metadata
= {"type": "Metadata", "kind": "PUBLISH-CDS", "metadata": ["4"]}
2746 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./metadata"),
2747 data
=json
.dumps(payload_metadata
))
2749 self
.assertEqual(r
.status_code
, 201)
2750 self
.assertEqual(rdata
["metadata"], payload_metadata
["metadata"])
2752 r
= self
.session
.get(
2753 self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2754 self
.assert_success_json(r
)
2756 self
.assertGreater(len(keys
), 0)
2758 key0
= deepcopy(keys
[0])
2759 self
.assertEqual(len(key0
['cds']), 1)
2760 self
.assertIn(key0
['cds'][0], key0
['ds'])
2761 self
.assertEqual(key0
['cds'][0].split()[2], '4')
2766 u
'algorithm': u
'ECDSAP256SHA256',
2769 u
'type': u
'Cryptokey',
2774 self
.assertEqual(key0
, expected
)
2776 keydata
= keys
[0]['dnskey'].split()
2777 self
.assertEqual(len(keydata
), 4)
2779 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./metadata/PUBLISH-CDS"))
2780 self
.assertEqual(r
.status_code
, 204)