]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
1 from __future__
import print_function
6 import requests
.exceptions
7 from copy
import deepcopy
8 from parameterized
import parameterized
9 from pprint
import pprint
10 from test_helper
import ApiTestCase
, unique_zone_name
, is_auth
, is_auth_lmdb
, is_recursor
, get_db_records
, pdnsutil_rectify
, sdig
13 def get_rrset(data
, qname
, qtype
):
14 for rrset
in data
['rrsets']:
15 if rrset
['name'] == qname
and rrset
['type'] == qtype
:
20 def get_first_rec(data
, qname
, qtype
):
21 rrset
= get_rrset(data
, qname
, qtype
)
23 return rrset
['records'][0]
27 def eq_zone_rrsets(rrsets
, expected
):
30 for type_
, expected_records
in expected
.items():
32 data_got
[type_
] = set()
33 data_expected
[type_
] = set()
34 uses_name
= any(['name' in expected_record
for expected_record
in expected_records
])
35 # minify + convert received data
36 for rrset
in [rrset
for rrset
in rrsets
if rrset
['type'] == type_
]:
38 for r
in rrset
['records']:
39 data_got
[type_
].add((rrset
['name'] if uses_name
else '@', rrset
['type'], r
['content']))
40 # minify expected data
41 for r
in expected_records
:
42 data_expected
[type_
].add((r
['name'] if uses_name
else '@', type_
, r
['content']))
44 print("eq_zone_rrsets: got:")
46 print("eq_zone_rrsets: expected:")
49 assert data_got
== data_expected
, "%r != %r" % (data_got
, data_expected
)
52 def assert_eq_rrsets(rrsets
, expected
):
53 """Assert rrsets sets are equal, ignoring sort order."""
54 key
= lambda rrset
: (rrset
['name'], rrset
['type'])
55 assert sorted(rrsets
, key
=key
) == sorted(expected
, key
=key
)
58 def templated_rrsets(rrsets
: list, zonename
: str):
60 Replace $NAME$ in `name` and `content` of given rrsets with `zonename`.
61 Will return a copy. Original rrsets should stay unmodified.
65 new_rrset
= rrset |
{"name": rrset
["name"].replace('$NAME$', zonename
)}
67 if "records" in rrset
:
69 for record
in rrset
["records"]:
70 records
.append(record |
{"content": record
["content"].replace('$NAME$', zonename
)})
71 new_rrset
["records"] = records
73 new_rrsets
.append(new_rrset
)
78 class Zones(ApiTestCase
):
80 def _test_list_zones(self
, dnssec
=True):
81 path
= "/api/v1/servers/localhost/zones"
83 path
= path
+ "?dnssec=false"
84 r
= self
.session
.get(self
.url(path
))
85 self
.assert_success_json(r
)
87 example_com
= [domain
for domain
in domains
if domain
['name'] in ('example.com', 'example.com.')]
88 self
.assertEqual(len(example_com
), 1)
89 example_com
= example_com
[0]
91 required_fields
= ['id', 'url', 'name', 'kind']
93 required_fields
= required_fields
+ ['masters', 'last_check', 'notified_serial', 'serial', 'account', 'catalog']
95 required_fields
= required_fields
= ['dnssec', 'edited_serial']
96 self
.assertNotEqual(example_com
['serial'], 0)
98 self
.assertNotIn('dnssec', example_com
)
100 required_fields
= required_fields
+ ['recursion_desired', 'servers']
101 for field
in required_fields
:
102 self
.assertIn(field
, example_com
)
104 def test_list_zones_with_dnssec(self
):
106 self
._test
_list
_zones
(True)
108 def test_list_zones_without_dnssec(self
):
109 self
._test
_list
_zones
(False)
112 class AuthZonesHelperMixin(object):
113 def create_zone(self
, name
=None, expect_error
=None, **kwargs
):
115 name
= unique_zone_name()
119 "nameservers": ["ns1.example.com.", "ns2.example.com."]
121 for k
, v
in kwargs
.items():
126 if "zone" in payload
:
127 payload
["zone"] = payload
["zone"].replace("$NAME$", payload
["name"])
128 if "rrsets" in payload
:
129 payload
["rrsets"] = templated_rrsets(payload
["rrsets"], payload
["name"])
131 print("Create zone", name
, "with:", payload
)
132 r
= self
.session
.post(
133 self
.url("/api/v1/servers/localhost/zones"),
134 data
=json
.dumps(payload
),
135 headers
={"content-type": "application/json"})
138 self
.assertEqual(r
.status_code
, 422, r
.content
)
140 if expect_error
is True:
143 self
.assertIn(expect_error
, reply
["error"])
146 self
.assertEqual(r
.status_code
, 201, r
.content
)
149 return name
, payload
, reply
151 def get_zone(self
, api_zone_id
, expect_error
=None, **kwargs
):
152 print("GET zone", api_zone_id
, "args:", kwargs
)
153 r
= self
.session
.get(
154 self
.url("/api/v1/servers/localhost/zones/" + api_zone_id
),
159 print("reply", reply
)
162 self
.assertEqual(r
.status_code
, 422)
163 if expect_error
is True:
166 self
.assertIn(expect_error
, r
.json()['error'])
169 self
.assert_success_json(r
)
170 self
.assertEqual(r
.status_code
, 200)
174 def put_zone(self
, api_zone_id
, payload
, expect_error
=None):
175 print("PUT zone", api_zone_id
, "with:", payload
)
176 r
= self
.session
.put(
177 self
.url("/api/v1/servers/localhost/zones/" + api_zone_id
),
178 data
=json
.dumps(payload
),
179 headers
={'content-type': 'application/json'})
181 print("reply status code:", r
.status_code
)
183 self
.assertEqual(r
.status_code
, 422, r
.content
)
185 if expect_error
is True:
188 self
.assertIn(expect_error
, reply
['error'])
190 # expect success (no content)
191 self
.assertEqual(r
.status_code
, 204, r
.content
)
193 @unittest.skipIf(not is_auth(), "Not applicable")
194 class AuthZones(ApiTestCase
, AuthZonesHelperMixin
):
196 def test_create_zone(self
):
197 # soa_edit_api has a default, override with empty for this test
198 name
, payload
, data
= self
.create_zone(serial
=22, soa_edit_api
='')
199 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'edited_serial', 'soa_edit_api', 'soa_edit', 'account'):
200 self
.assertIn(k
, data
)
202 self
.assertEqual(data
[k
], payload
[k
])
203 # validate generated SOA
204 expected_soa
= "a.misconfigured.dns.server.invalid. hostmaster." + name
+ " " + \
205 str(payload
['serial']) + " 10800 3600 604800 3600"
207 get_first_rec(data
, name
, 'SOA')['content'],
211 if not is_auth_lmdb():
212 # Because we had confusion about dots, check that the DB is without dots.
213 dbrecs
= get_db_records(name
, 'SOA')
214 self
.assertEqual(dbrecs
[0]['content'], expected_soa
.replace('. ', ' '))
215 self
.assertNotEqual(data
['serial'], data
['edited_serial'])
217 def test_create_zone_with_soa_edit_api(self
):
218 # soa_edit_api wins over serial
219 name
, payload
, data
= self
.create_zone(soa_edit_api
='EPOCH', serial
=10)
220 for k
in ('soa_edit_api', ):
221 self
.assertIn(k
, data
)
223 self
.assertEqual(data
[k
], payload
[k
])
224 # generated EPOCH serial surely is > fixed serial we passed in
226 self
.assertGreater(data
['serial'], payload
['serial'])
227 soa_serial
= int(get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2])
228 self
.assertGreater(soa_serial
, payload
['serial'])
229 self
.assertEqual(soa_serial
, data
['serial'])
231 def test_create_zone_with_catalog(self
):
232 # soa_edit_api wins over serial
233 name
, payload
, data
= self
.create_zone(catalog
='catalog.invalid.', serial
=10)
235 for k
in ('catalog', ):
236 self
.assertIn(k
, data
)
238 self
.assertEqual(data
[k
], payload
[k
])
240 # check that the catalog is reflected in the /zones output (#13633)
241 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
242 self
.assert_success_json(r
)
244 domain
= [domain
for domain
in domains
if domain
['name'] == name
]
245 self
.assertEqual(len(domain
), 1)
247 self
.assertEqual(domain
["catalog"], "catalog.invalid.")
249 def test_create_zone_with_account(self
):
250 # soa_edit_api wins over serial
251 name
, payload
, data
= self
.create_zone(account
='anaccount', serial
=10, kind
='Master')
253 for k
in ('account', ):
254 self
.assertIn(k
, data
)
256 self
.assertEqual(data
[k
], payload
[k
])
258 # as we did not set a catalog in our request, check that the default catalog was applied
259 self
.assertEqual(data
['catalog'], "default-catalog.example.com.")
261 def test_create_zone_default_soa_edit_api(self
):
262 name
, payload
, data
= self
.create_zone()
264 self
.assertEqual(data
['soa_edit_api'], 'DEFAULT')
266 def test_create_zone_exists(self
):
267 name
, payload
, data
= self
.create_zone()
274 r
= self
.session
.post(
275 self
.url("/api/v1/servers/localhost/zones"),
276 data
=json
.dumps(payload
),
277 headers
={'content-type': 'application/json'})
278 self
.assertEqual(r
.status_code
, 409) # Conflict - already exists
280 def test_create_zone_with_soa_edit(self
):
281 name
, payload
, data
= self
.create_zone(soa_edit
='INCEPTION-INCREMENT', soa_edit_api
='SOA-EDIT-INCREASE')
283 self
.assertEqual(data
['soa_edit'], 'INCEPTION-INCREMENT')
284 self
.assertEqual(data
['soa_edit_api'], 'SOA-EDIT-INCREASE')
285 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
286 # These particular settings lead to the first serial set to YYYYMMDD01.
287 self
.assertEqual(soa_serial
[-2:], '01')
289 'changetype': 'replace',
295 "content": "127.0.0.1",
300 payload
= {'rrsets': [rrset
]}
302 self
.url("/api/v1/servers/localhost/zones/" + data
['id']),
303 data
=json
.dumps(payload
),
304 headers
={'content-type': 'application/json'})
305 data
= self
.get_zone(data
['id'])
306 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
307 self
.assertEqual(soa_serial
[-2:], '02')
309 def test_create_zone_with_records(self
):
310 name
= unique_zone_name()
316 "content": "4.3.2.1",
320 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
321 # check our record has appeared
322 self
.assertEqual(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
324 def test_create_zone_with_wildcard_records(self
):
325 name
= unique_zone_name()
331 "content": "4.3.2.1",
335 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
336 # check our record has appeared
337 self
.assertEqual(get_rrset(data
, rrset
['name'], 'A')['records'], rrset
['records'])
339 def test_create_zone_with_comments(self
):
340 name
= unique_zone_name()
344 "type": "soa", # test uppercasing of type, too.
347 "content": "blah blah and test a few non-ASCII chars: ö, €",
348 "modified_at": 11112,
356 "content": "2001:DB8::1",
360 "account": "test AAAA",
361 "content": "blah blah AAAA",
362 "modified_at": 11112,
370 "content": "\"test TXT\"",
379 "content": "192.0.2.1",
386 # No comments in LMDB
387 self
.create_zone(name
=name
, rrsets
=rrsets
, expect_error
="Hosting backend does not support editing comments.")
390 name
, _
, data
= self
.create_zone(name
=name
, rrsets
=rrsets
)
391 # NS records have been created
392 self
.assertEqual(len(data
['rrsets']), len(rrsets
) + 1)
393 # check our comment has appeared
394 self
.assertEqual(get_rrset(data
, name
, 'SOA')['comments'], rrsets
[0]['comments'])
395 self
.assertEqual(get_rrset(data
, name
, 'A')['comments'], [])
396 self
.assertEqual(get_rrset(data
, name
, 'TXT')['comments'], [])
397 self
.assertEqual(get_rrset(data
, name
, 'AAAA')['comments'], rrsets
[1]['comments'])
399 def test_create_zone_uncanonical_nameservers(self
):
400 name
= unique_zone_name()
404 'nameservers': ['uncanon.example.com']
407 r
= self
.session
.post(
408 self
.url("/api/v1/servers/localhost/zones"),
409 data
=json
.dumps(payload
),
410 headers
={'content-type': 'application/json'})
411 self
.assertEqual(r
.status_code
, 422)
412 self
.assertIn('Nameserver is not canonical', r
.json()['error'])
414 def test_create_auth_zone_no_name(self
):
415 name
= unique_zone_name()
421 r
= self
.session
.post(
422 self
.url("/api/v1/servers/localhost/zones"),
423 data
=json
.dumps(payload
),
424 headers
={'content-type': 'application/json'})
425 self
.assertEqual(r
.status_code
, 422)
426 self
.assertIn('is not canonical', r
.json()['error'])
428 def test_create_zone_with_custom_soa(self
):
429 name
= unique_zone_name()
430 content
= u
"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
433 "type": "soa", # test uppercasing of type, too.
440 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
], soa_edit_api
='')
441 self
.assertEqual(get_rrset(data
, name
, 'SOA')['records'], rrset
['records'])
442 if not is_auth_lmdb():
443 dbrecs
= get_db_records(name
, 'SOA')
444 self
.assertEqual(dbrecs
[0]['content'], content
.replace('. ', ' '))
446 def test_create_zone_double_dot(self
):
447 name
= 'test..' + unique_zone_name()
451 'nameservers': ['ns1.example.com.']
454 r
= self
.session
.post(
455 self
.url("/api/v1/servers/localhost/zones"),
456 data
=json
.dumps(payload
),
457 headers
={'content-type': 'application/json'})
458 self
.assertEqual(r
.status_code
, 422)
459 self
.assertIn('Unable to parse DNS Name', r
.json()['error'])
461 def test_create_zone_restricted_chars(self
):
462 name
= 'test:' + unique_zone_name() # : isn't good as a name.
466 'nameservers': ['ns1.example.com']
469 r
= self
.session
.post(
470 self
.url("/api/v1/servers/localhost/zones"),
471 data
=json
.dumps(payload
),
472 headers
={'content-type': 'application/json'})
473 self
.assertEqual(r
.status_code
, 422)
474 self
.assertIn('contains unsupported characters', r
.json()['error'])
476 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self
):
477 name
= unique_zone_name()
483 "content": "ns2.example.com.",
490 'nameservers': ['ns1.example.com.'],
494 r
= self
.session
.post(
495 self
.url("/api/v1/servers/localhost/zones"),
496 data
=json
.dumps(payload
),
497 headers
={'content-type': 'application/json'})
498 self
.assertEqual(r
.status_code
, 422)
499 self
.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r
.json()['error'])
501 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self
):
502 name
= unique_zone_name()
504 "name": 'subzone.'+name
,
508 "content": "ns2.example.com.",
515 'nameservers': ['ns1.example.com.'],
519 r
= self
.session
.post(
520 self
.url("/api/v1/servers/localhost/zones"),
521 data
=json
.dumps(payload
),
522 headers
={'content-type': 'application/json'})
523 self
.assert_success_json(r
)
525 def test_create_zone_with_symbols(self
):
526 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
527 name
= payload
['name']
528 expected_id
= name
.replace('/', '=2F')
529 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
530 self
.assertIn(k
, data
)
532 self
.assertEqual(data
[k
], payload
[k
])
533 self
.assertEqual(data
['id'], expected_id
)
534 if not is_auth_lmdb():
535 dbrecs
= get_db_records(name
, 'SOA')
536 self
.assertEqual(dbrecs
[0]['name'], name
.rstrip('.'))
538 def test_create_zone_with_nameservers_non_string(self
):
539 # ensure we don't crash
540 name
= unique_zone_name()
544 'nameservers': [{'a': 'ns1.example.com'}] # invalid
547 r
= self
.session
.post(
548 self
.url("/api/v1/servers/localhost/zones"),
549 data
=json
.dumps(payload
),
550 headers
={'content-type': 'application/json'})
551 self
.assertEqual(r
.status_code
, 422)
553 def test_create_zone_with_dnssec(self
):
555 Create a zone with "dnssec" set and see if a key was made.
557 name
= unique_zone_name()
558 name
, payload
, data
= self
.create_zone(dnssec
=True)
562 for k
in ('dnssec', ):
563 self
.assertIn(k
, data
)
565 self
.assertEqual(data
[k
], payload
[k
])
567 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
573 self
.assertEqual(r
.status_code
, 200)
574 self
.assertEqual(len(keys
), 1)
575 self
.assertEqual(keys
[0]['type'], 'Cryptokey')
576 self
.assertEqual(keys
[0]['active'], True)
577 self
.assertEqual(keys
[0]['keytype'], 'csk')
579 def test_create_zone_with_dnssec_disable_dnssec(self
):
581 Create a zone with "dnssec", then set "dnssec" to false and see if the
584 name
= unique_zone_name()
585 name
, payload
, data
= self
.create_zone(dnssec
=True)
587 self
.put_zone(name
, {'dnssec': False})
589 zoneinfo
= self
.get_zone(name
)
590 self
.assertEqual(zoneinfo
['dnssec'], False)
592 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
596 self
.assertEqual(r
.status_code
, 200)
597 self
.assertEqual(len(keys
), 0)
599 def test_create_zone_with_nsec3param(self
):
601 Create a zone with "nsec3param" set and see if the metadata was added.
603 name
= unique_zone_name()
604 nsec3param
= '1 0 100 aabbccddeeff'
605 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
)
609 for k
in ('dnssec', 'nsec3param'):
610 self
.assertIn(k
, data
)
612 self
.assertEqual(data
[k
], payload
[k
])
614 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3PARAM'))
620 self
.assertEqual(r
.status_code
, 200)
621 self
.assertEqual(len(data
['metadata']), 1)
622 self
.assertEqual(data
['kind'], 'NSEC3PARAM')
623 self
.assertEqual(data
['metadata'][0], nsec3param
)
625 def test_create_zone_with_nsec3narrow(self
):
627 Create a zone with "nsec3narrow" set and see if the metadata was added.
629 name
= unique_zone_name()
630 nsec3param
= '1 0 100 aabbccddeeff'
631 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
,
636 for k
in ('dnssec', 'nsec3param', 'nsec3narrow'):
637 self
.assertIn(k
, data
)
639 self
.assertEqual(data
[k
], payload
[k
])
641 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3NARROW'))
647 self
.assertEqual(r
.status_code
, 200)
648 self
.assertEqual(len(data
['metadata']), 1)
649 self
.assertEqual(data
['kind'], 'NSEC3NARROW')
650 self
.assertEqual(data
['metadata'][0], '1')
652 def test_create_zone_with_nsec3param_switch_to_nsec(self
):
654 Create a zone with "nsec3param", then remove the params
656 name
, payload
, data
= self
.create_zone(dnssec
=True,
657 nsec3param
='1 0 1 ab')
658 self
.put_zone(name
, {'nsec3param': ''})
660 data
= self
.get_zone(name
)
661 self
.assertEqual(data
['nsec3param'], '')
663 def test_create_zone_without_dnssec_unset_nsec3parm(self
):
665 Create a non dnssec zone and set an empty "nsec3param"
667 name
, payload
, data
= self
.create_zone(dnssec
=False)
668 self
.put_zone(name
, {'nsec3param': ''})
670 def test_create_zone_without_dnssec_set_nsec3parm(self
):
672 Create a non dnssec zone and set "nsec3param"
674 name
, payload
, data
= self
.create_zone(dnssec
=False)
675 self
.put_zone(name
, {'nsec3param': '1 0 1 ab'}, expect_error
=True)
677 def test_create_zone_dnssec_serial(self
):
679 Create a zone, then set and unset "dnssec", then check if the serial was increased
682 name
, payload
, data
= self
.create_zone()
684 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
685 self
.assertEqual(soa_serial
[-2:], '01')
687 self
.put_zone(name
, {'dnssec': True})
689 data
= self
.get_zone(name
)
690 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
691 self
.assertEqual(soa_serial
[-2:], '02')
693 self
.put_zone(name
, {'dnssec': False})
695 data
= self
.get_zone(name
)
696 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
697 self
.assertEqual(soa_serial
[-2:], '03')
699 def test_zone_absolute_url(self
):
701 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
704 self
.assertTrue(rdata
[0]['url'].startswith('/api/v'))
706 def test_create_zone_metadata(self
):
707 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
708 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
709 data
=json
.dumps(payload_metadata
))
711 self
.assertEqual(r
.status_code
, 201)
712 self
.assertEqual(rdata
["metadata"], payload_metadata
["metadata"])
714 def test_create_zone_metadata_kind(self
):
715 payload_metadata
= {"metadata": ["127.0.0.2"]}
716 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
717 data
=json
.dumps(payload_metadata
))
719 self
.assertEqual(r
.status_code
, 200)
720 self
.assertEqual(rdata
["metadata"], payload_metadata
["metadata"])
722 def test_create_protected_zone_metadata(self
):
723 # test whether it prevents modification of certain kinds
724 for k
in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
725 payload
= {"metadata": ["FOO", "BAR"]}
726 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k
),
727 data
=json
.dumps(payload
))
728 self
.assertEqual(r
.status_code
, 422)
730 def test_retrieve_zone_metadata(self
):
731 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
732 self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
733 data
=json
.dumps(payload_metadata
))
734 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"))
736 self
.assertEqual(r
.status_code
, 200)
737 self
.assertIn(payload_metadata
, rdata
)
739 def test_delete_zone_metadata(self
):
740 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
741 self
.assertEqual(r
.status_code
, 204)
742 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
744 self
.assertEqual(r
.status_code
, 200)
745 self
.assertEqual(rdata
["metadata"], [])
747 def test_create_external_zone_metadata(self
):
748 payload_metadata
= {"metadata": ["My very important message"]}
749 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
750 data
=json
.dumps(payload_metadata
))
751 self
.assertEqual(r
.status_code
, 200)
753 self
.assertEqual(rdata
["metadata"], payload_metadata
["metadata"])
755 def test_create_metadata_in_non_existent_zone(self
):
756 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
757 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
758 data
=json
.dumps(payload_metadata
))
759 self
.assertEqual(r
.status_code
, 404)
760 # Note: errors should probably contain json (see #5988)
761 # self.assertIn('Could not find domain ', r.json()['error'])
763 def test_create_slave_zone(self
):
764 # Test that nameservers can be absent for slave zones.
765 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
766 for k
in ('name', 'masters', 'kind'):
767 self
.assertIn(k
, data
)
768 self
.assertEqual(data
[k
], payload
[k
])
769 print("payload:", payload
)
771 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
772 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
774 print("zonelist:", zonelist
)
775 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
776 # Also test that fetching the zone works.
777 data
= self
.get_zone(data
['id'])
778 print("zone (fetched):", data
)
779 for k
in ('name', 'masters', 'kind'):
780 self
.assertIn(k
, data
)
781 self
.assertEqual(data
[k
], payload
[k
])
782 self
.assertEqual(data
['serial'], 0)
783 self
.assertEqual(data
['rrsets'], [])
785 def test_create_consumer_zone(self
):
786 # Test that nameservers can be absent for consumer zones.
787 _
, payload
, data
= self
.create_zone(kind
='Consumer', nameservers
=None, masters
=['127.0.0.2'])
788 print("payload:", payload
)
790 # Because consumer zones don't get a SOA, we need to test that they'll show up in the zone list.
791 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
793 print("zonelist:", zonelist
)
794 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
795 # Also test that fetching the zone works.
796 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
798 print("zone (fetched):", data
)
799 for k
in ('name', 'masters', 'kind'):
800 self
.assertIn(k
, data
)
801 self
.assertEqual(data
[k
], payload
[k
])
802 self
.assertEqual(data
['serial'], 0)
803 self
.assertEqual(data
['rrsets'], [])
805 def test_create_consumer_zone_no_nameservers(self
):
806 """nameservers must be absent for Consumer zones"""
807 self
.create_zone(kind
="Consumer", nameservers
=["127.0.0.1"], expect_error
="Nameservers MUST NOT be given for Consumer zones")
809 def test_create_consumer_zone_no_rrsets(self
):
810 """rrsets must be absent for Consumer zones"""
816 "content": "ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600",
820 self
.create_zone(kind
="Consumer", nameservers
=None, rrsets
=rrsets
, expect_error
="Zone data MUST NOT be given for Consumer zones")
822 def test_find_zone_by_name(self
):
823 name
= 'foo/' + unique_zone_name()
824 name
, payload
, data
= self
.create_zone(name
=name
)
825 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones?zone=" + name
))
828 self
.assertEqual(data
[0]['name'], name
)
830 def test_delete_slave_zone(self
):
831 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
832 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
835 def test_delete_consumer_zone(self
):
836 name
, payload
, data
= self
.create_zone(kind
='Consumer', nameservers
=None, masters
=['127.0.0.2'])
837 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
840 def test_retrieve_slave_zone(self
):
841 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
842 print("payload:", payload
)
844 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/axfr-retrieve"))
846 print("status for axfr-retrieve:", data
)
847 self
.assertEqual(data
['result'], u
'Added retrieval request for \'' + payload
['name'] +
848 '\' from primary 127.0.0.2')
850 def test_notify_master_zone(self
):
851 name
, payload
, data
= self
.create_zone(kind
='Master')
852 print("payload:", payload
)
854 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/notify"))
856 print("status for notify:", data
)
857 self
.assertEqual(data
['result'], 'Notification queued')
859 def test_get_zone_with_symbols(self
):
860 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
861 name
= payload
['name']
862 zone_id
= (name
.replace('/', '=2F'))
863 data
= self
.get_zone(zone_id
)
864 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
865 self
.assertIn(k
, data
)
867 self
.assertEqual(data
[k
], payload
[k
])
869 def test_get_zone(self
):
870 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
872 example_com
= [domain
for domain
in domains
if domain
['name'] == u
'example.com.'][0]
873 data
= self
.get_zone(example_com
['id'])
874 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
875 self
.assertIn(k
, data
)
876 self
.assertEqual(data
['name'], 'example.com.')
878 def test_get_zone_rrset(self
):
879 rz
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
881 example_com
= [domain
for domain
in domains
if domain
['name'] == u
'example.com.'][0]
883 # verify single record from name that has a single record
884 data
= self
.get_zone(example_com
['id'], rrset_name
="host-18000.example.com.")
885 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
886 self
.assertIn(k
, data
)
887 self
.assertEqual(data
['rrsets'],
891 'name': 'host-18000.example.com.',
895 'content': '192.168.1.80',
905 # verify two RRsets from a name that has two types with one record each
906 powerdnssec_org
= [domain
for domain
in domains
if domain
['name'] == u
'powerdnssec.org.'][0]
907 data
= self
.get_zone(powerdnssec_org
['id'], rrset_name
="localhost.powerdnssec.org.")
908 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
909 self
.assertIn(k
, data
)
910 self
.assertEqual(sorted(data
['rrsets'], key
=operator
.itemgetter('type')),
914 'name': 'localhost.powerdnssec.org.',
918 'content': '127.0.0.1',
927 'name': 'localhost.powerdnssec.org.',
941 # verify one RRset with one record from a name that has two, then filtered by type
942 data
= self
.get_zone(powerdnssec_org
['id'], rrset_name
="localhost.powerdnssec.org.", rrset_type
="AAAA")
943 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'rrsets'):
944 self
.assertIn(k
, data
)
945 self
.assertEqual(data
['rrsets'],
949 'name': 'localhost.powerdnssec.org.',
963 def test_import_zone_broken(self
):
965 'name': 'powerdns-broken.com',
969 payload
['zone'] = """
970 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
971 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
972 ;; WARNING: recursion requested but not available
974 ;; OPT PSEUDOSECTION:
975 ; EDNS: version: 0, flags:; udp: 1680
977 ;powerdns.com. IN SOA
980 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
981 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
982 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
983 powerdns-broken.com. 86400 IN A 82.94.213.34
984 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
985 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
986 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
988 r
= self
.session
.post(
989 self
.url("/api/v1/servers/localhost/zones"),
990 data
=json
.dumps(payload
),
991 headers
={'content-type': 'application/json'})
992 self
.assertEqual(r
.status_code
, 422)
994 def test_import_zone_axfr_outofzone(self
):
995 # Ensure we don't create out-of-zone records
997 'name': unique_zone_name(),
1001 payload
['zone'] = """
1002 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1003 %NAME% 3600 IN NS powerdnssec2.ds9a.nl.
1004 example.org. 3600 IN AAAA 2001:888:2000:1d::2
1005 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1006 """.replace('%NAME%', payload
['name'])
1007 r
= self
.session
.post(
1008 self
.url("/api/v1/servers/localhost/zones"),
1009 data
=json
.dumps(payload
),
1010 headers
={'content-type': 'application/json'})
1011 self
.assertEqual(r
.status_code
, 422)
1012 self
.assertEqual(r
.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
1014 def test_import_zone_axfr(self
):
1016 'name': 'powerdns.com.',
1019 'soa_edit_api': '', # turn off so exact SOA comparison works.
1021 payload
['zone'] = """
1022 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
1023 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
1024 ;; WARNING: recursion requested but not available
1026 ;; OPT PSEUDOSECTION:
1027 ; EDNS: version: 0, flags:; udp: 1680
1028 ;; QUESTION SECTION:
1029 ;powerdns.com. IN SOA
1032 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1033 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
1034 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
1035 powerdns.com. 86400 IN A 82.94.213.34
1036 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
1037 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
1038 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
1040 r
= self
.session
.post(
1041 self
.url("/api/v1/servers/localhost/zones"),
1042 data
=json
.dumps(payload
),
1043 headers
={'content-type': 'application/json'})
1044 self
.assert_success_json(r
)
1046 self
.assertIn('name', data
)
1050 {'content': 'powerdnssec1.ds9a.nl.'},
1051 {'content': 'powerdnssec2.ds9a.nl.'},
1054 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
1057 {'content': '0 xs.powerdns.com.'},
1060 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
1063 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
1067 eq_zone_rrsets(data
['rrsets'], expected
)
1069 if not is_auth_lmdb():
1070 # check content in DB is stored WITHOUT trailing dot.
1071 dbrecs
= get_db_records(payload
['name'], 'NS')
1072 dbrec
= next((dbrec
for dbrec
in dbrecs
if dbrec
['content'].startswith('powerdnssec1')))
1073 self
.assertEqual(dbrec
['content'], 'powerdnssec1.ds9a.nl')
1075 def test_import_zone_bind(self
):
1077 'name': 'example.org.',
1080 'soa_edit_api': '', # turn off so exact SOA comparison works.
1082 payload
['zone'] = """
1083 $TTL 86400 ; 24 hours could have been written as 24h or 1d
1084 ; $TTL used for all RRs without explicit TTL value
1085 $ORIGIN example.org.
1086 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
1093 IN NS ns1.example.org. ; in the domain
1094 IN NS ns2.smokeyjoe.com. ; external to domain
1095 IN MX 10 mail.another.com. ; external mail provider
1096 ; server host definitions
1097 ns1 IN A 192.168.0.1 ;name server definition
1098 www IN A 192.168.0.2 ;web server definition
1099 ftp IN CNAME www.example.org. ;ftp server definition
1100 ; non server domain hosts
1101 bill IN A 192.168.0.3
1102 fred IN A 192.168.0.4
1104 r
= self
.session
.post(
1105 self
.url("/api/v1/servers/localhost/zones"),
1106 data
=json
.dumps(payload
),
1107 headers
={'content-type': 'application/json'})
1108 self
.assert_success_json(r
)
1110 self
.assertIn('name', data
)
1114 {'content': 'ns1.example.org.'},
1115 {'content': 'ns2.smokeyjoe.com.'},
1118 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
1121 {'content': '10 mail.another.com.'},
1124 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
1125 {'content': '192.168.0.2', 'name': 'www.example.org.'},
1126 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
1127 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
1130 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
1134 eq_zone_rrsets(data
['rrsets'], expected
)
1136 def test_import_zone_bind_cname_apex(self
):
1138 'name': unique_zone_name(),
1142 payload
['zone'] = """
1144 @ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
1145 @ IN NS ns1.example.org.
1146 @ IN NS ns2.smokeyjoe.com.
1147 @ IN CNAME www.example.org.
1148 """.replace('%NAME%', payload
['name'])
1149 r
= self
.session
.post(
1150 self
.url("/api/v1/servers/localhost/zones"),
1151 data
=json
.dumps(payload
),
1152 headers
={'content-type': 'application/json'})
1153 self
.assertEqual(r
.status_code
, 422)
1154 self
.assertIn('Conflicts with another RRset', r
.json()['error'])
1156 def test_export_zone_json(self
):
1157 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
1159 r
= self
.session
.get(
1160 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
1161 headers
={'accept': 'application/json;q=0.9,*/*;q=0.8'}
1163 self
.assert_success_json(r
)
1165 self
.assertIn('zone', data
)
1166 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
1167 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
1168 name
+ '\t3600\tIN\tSOA\ta.misconfigured.dns.server.invalid. hostmaster.' + name
+
1169 ' 0 10800 3600 604800 3600']
1170 self
.assertCountEqual(data
['zone'].strip().split('\n'), expected_data
)
1172 def test_import_zone_consumer(self
):
1174 $NAME$ 1D IN SOA ns1.example.org. hostmaster.example.org. (
1182 self
.create_zone(kind
="Consumer", nameservers
=[], zone
=zonestring
, expect_error
="Zone data MUST NOT be given for Consumer zones")
1184 def test_export_zone_text(self
):
1185 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
1187 r
= self
.session
.get(
1188 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
1189 headers
={'accept': '*/*'}
1191 data
= r
.text
.strip().split("\n")
1192 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
1193 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
1194 name
+ '\t3600\tIN\tSOA\ta.misconfigured.dns.server.invalid. hostmaster.' + name
+
1195 ' 0 10800 3600 604800 3600']
1196 self
.assertCountEqual(data
, expected_data
)
1198 def test_update_zone(self
):
1199 name
, payload
, zone
= self
.create_zone()
1200 name
= payload
['name']
1201 # update, set as Master and enable SOA-EDIT-API
1204 'masters': ['192.0.2.1', '192.0.2.2'],
1205 'catalog': 'catalog.invalid.',
1206 'soa_edit_api': 'EPOCH',
1209 self
.put_zone(name
, payload
)
1210 data
= self
.get_zone(name
)
1211 for k
in payload
.keys():
1212 self
.assertIn(k
, data
)
1213 self
.assertEqual(data
[k
], payload
[k
])
1214 # update, back to Native and empty(off)
1221 self
.put_zone(name
, payload
)
1222 data
= self
.get_zone(name
)
1223 for k
in payload
.keys():
1224 self
.assertIn(k
, data
)
1225 self
.assertEqual(data
[k
], payload
[k
])
1227 def test_zone_rr_update(self
):
1228 name
, payload
, zone
= self
.create_zone()
1229 # do a replace (= update)
1231 'changetype': 'replace',
1237 "content": "ns1.bar.com.",
1241 "content": "ns2-disabled.bar.com.",
1246 payload
= {'rrsets': [rrset
]}
1247 r
= self
.session
.patch(
1248 self
.url("/api/v1/servers/localhost/zones/" + name
),
1249 data
=json
.dumps(payload
),
1250 headers
={'content-type': 'application/json'})
1251 self
.assert_success(r
)
1252 # verify that (only) the new record is there
1253 data
= self
.get_zone(name
)
1254 self
.assertCountEqual(get_rrset(data
, name
, 'NS')['records'], rrset
['records'])
1256 def test_zone_rr_update_mx(self
):
1257 # Important to test with MX records, as they have a priority field, which must end up in the content field.
1258 name
, payload
, zone
= self
.create_zone()
1259 # do a replace (= update)
1261 'changetype': 'replace',
1267 "content": "10 mail.example.org.",
1272 payload
= {'rrsets': [rrset
]}
1273 r
= self
.session
.patch(
1274 self
.url("/api/v1/servers/localhost/zones/" + name
),
1275 data
=json
.dumps(payload
),
1276 headers
={'content-type': 'application/json'})
1277 self
.assert_success(r
)
1278 # verify that (only) the new record is there
1279 data
= self
.get_zone(name
)
1280 self
.assertEqual(get_rrset(data
, name
, 'MX')['records'], rrset
['records'])
1282 def test_zone_rr_update_invalid_mx(self
):
1283 name
, payload
, zone
= self
.create_zone()
1284 # do a replace (= update)
1286 'changetype': 'replace',
1292 "content": "10 mail@mx.example.org.",
1297 payload
= {'rrsets': [rrset
]}
1298 r
= self
.session
.patch(
1299 self
.url("/api/v1/servers/localhost/zones/" + name
),
1300 data
=json
.dumps(payload
),
1301 headers
={'content-type': 'application/json'})
1302 self
.assertEqual(r
.status_code
, 422)
1303 self
.assertIn('non-hostname content', r
.json()['error'])
1304 data
= self
.get_zone(name
)
1305 self
.assertIsNone(get_rrset(data
, name
, 'MX'))
1307 def test_zone_rr_update_opt(self
):
1308 name
, payload
, zone
= self
.create_zone()
1309 # do a replace (= update)
1311 'changetype': 'replace',
1322 payload
= {'rrsets': [rrset
]}
1323 r
= self
.session
.patch(
1324 self
.url("/api/v1/servers/localhost/zones/" + name
),
1325 data
=json
.dumps(payload
),
1326 headers
={'content-type': 'application/json'})
1327 self
.assertEqual(r
.status_code
, 422)
1328 self
.assertIn('OPT: invalid type given', r
.json()['error'])
1330 def test_zone_rr_update_multiple_rrsets(self
):
1331 name
, payload
, zone
= self
.create_zone()
1333 'changetype': 'replace',
1340 "content": "ns9999.example.com.",
1346 'changetype': 'replace',
1352 "content": "10 mx444.example.com.",
1357 payload
= {'rrsets': [rrset1
, rrset2
]}
1358 r
= self
.session
.patch(
1359 self
.url("/api/v1/servers/localhost/zones/" + name
),
1360 data
=json
.dumps(payload
),
1361 headers
={'content-type': 'application/json'})
1362 self
.assert_success(r
)
1363 # verify that all rrsets have been updated
1364 data
= self
.get_zone(name
)
1365 self
.assertEqual(get_rrset(data
, name
, 'NS')['records'], rrset1
['records'])
1366 self
.assertEqual(get_rrset(data
, name
, 'MX')['records'], rrset2
['records'])
1368 def test_zone_rr_update_duplicate_record(self
):
1369 name
, payload
, zone
= self
.create_zone()
1371 'changetype': 'replace',
1376 {"content": "ns9999.example.com.", "disabled": False},
1377 {"content": "ns9996.example.com.", "disabled": False},
1378 {"content": "ns9987.example.com.", "disabled": False},
1379 {"content": "ns9988.example.com.", "disabled": False},
1380 {"content": "ns9999.example.com.", "disabled": False},
1383 payload
= {'rrsets': [rrset
]}
1384 r
= self
.session
.patch(
1385 self
.url("/api/v1/servers/localhost/zones/" + name
),
1386 data
=json
.dumps(payload
),
1387 headers
={'content-type': 'application/json'})
1388 self
.assertEqual(r
.status_code
, 422)
1389 self
.assertIn('Duplicate record in RRset', r
.json()['error'])
1391 def test_zone_rr_update_duplicate_rrset(self
):
1392 name
, payload
, zone
= self
.create_zone()
1394 'changetype': 'replace',
1400 "content": "ns9999.example.com.",
1406 'changetype': 'replace',
1412 "content": "ns9998.example.com.",
1417 payload
= {'rrsets': [rrset1
, rrset2
]}
1418 r
= self
.session
.patch(
1419 self
.url("/api/v1/servers/localhost/zones/" + name
),
1420 data
=json
.dumps(payload
),
1421 headers
={'content-type': 'application/json'})
1422 self
.assertEqual(r
.status_code
, 422)
1423 self
.assertIn('Duplicate RRset', r
.json()['error'])
1425 def test_zone_rr_delete(self
):
1426 name
, payload
, zone
= self
.create_zone()
1427 # do a delete of all NS records (these are created with the zone)
1429 'changetype': 'delete',
1433 payload
= {'rrsets': [rrset
]}
1434 r
= self
.session
.patch(
1435 self
.url("/api/v1/servers/localhost/zones/" + name
),
1436 data
=json
.dumps(payload
),
1437 headers
={'content-type': 'application/json'})
1438 self
.assert_success(r
)
1439 # verify that the records are gone
1440 data
= self
.get_zone(name
)
1441 self
.assertIsNone(get_rrset(data
, name
, 'NS'))
1443 def test_zone_rr_update_rrset_combine_replace_and_delete(self
):
1444 name
, payload
, zone
= self
.create_zone()
1446 'changetype': 'delete',
1447 'name': 'sub.' + name
,
1451 'changetype': 'replace',
1452 'name': 'sub.' + name
,
1457 "content": "www.example.org.",
1462 payload
= {'rrsets': [rrset1
, rrset2
]}
1463 r
= self
.session
.patch(
1464 self
.url("/api/v1/servers/localhost/zones/" + name
),
1465 data
=json
.dumps(payload
),
1466 headers
={'content-type': 'application/json'})
1467 self
.assert_success(r
)
1468 # verify that (only) the new record is there
1469 data
= self
.get_zone(name
)
1470 self
.assertEqual(get_rrset(data
, 'sub.' + name
, 'CNAME')['records'], rrset2
['records'])
1472 def test_zone_disable_reenable(self
):
1473 # This also tests that SOA-EDIT-API works.
1474 name
, payload
, zone
= self
.create_zone(soa_edit_api
='EPOCH')
1475 # disable zone by disabling SOA
1477 'changetype': 'replace',
1483 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1488 payload
= {'rrsets': [rrset
]}
1489 r
= self
.session
.patch(
1490 self
.url("/api/v1/servers/localhost/zones/" + name
),
1491 data
=json
.dumps(payload
),
1492 headers
={'content-type': 'application/json'})
1493 self
.assert_success(r
)
1494 # check SOA serial has been edited
1495 data
= self
.get_zone(name
)
1496 soa_serial1
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1497 self
.assertNotEqual(soa_serial1
, '1')
1498 # make sure domain is still in zone list (disabled SOA!)
1499 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
1501 self
.assertEqual(len([domain
for domain
in domains
if domain
['name'] == name
]), 1)
1502 # sleep 1sec to ensure the EPOCH value changes for the next request
1504 # verify that modifying it still works
1505 rrset
['records'][0]['disabled'] = False
1506 payload
= {'rrsets': [rrset
]}
1507 r
= self
.session
.patch(
1508 self
.url("/api/v1/servers/localhost/zones/" + name
),
1509 data
=json
.dumps(payload
),
1510 headers
={'content-type': 'application/json'})
1511 self
.assert_success(r
)
1512 # check SOA serial has been edited again
1513 data
= self
.get_zone(name
)
1514 soa_serial2
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1515 self
.assertNotEqual(soa_serial2
, '1')
1516 self
.assertNotEqual(soa_serial2
, soa_serial1
)
1518 def test_zone_rr_update_out_of_zone(self
):
1519 name
, payload
, zone
= self
.create_zone()
1520 # replace with qname mismatch
1522 'changetype': 'replace',
1523 'name': 'not-in-zone.',
1528 "content": "ns1.bar.com.",
1533 payload
= {'rrsets': [rrset
]}
1534 r
= self
.session
.patch(
1535 self
.url("/api/v1/servers/localhost/zones/" + name
),
1536 data
=json
.dumps(payload
),
1537 headers
={'content-type': 'application/json'})
1538 self
.assertEqual(r
.status_code
, 422)
1539 self
.assertIn('out of zone', r
.json()['error'])
1541 def test_zone_rr_update_restricted_chars(self
):
1542 name
, payload
, zone
= self
.create_zone()
1543 # replace with qname mismatch
1545 'changetype': 'replace',
1546 'name': 'test:' + name
,
1551 "content": "ns1.bar.com.",
1556 payload
= {'rrsets': [rrset
]}
1557 r
= self
.session
.patch(
1558 self
.url("/api/v1/servers/localhost/zones/" + name
),
1559 data
=json
.dumps(payload
),
1560 headers
={'content-type': 'application/json'})
1561 self
.assertEqual(r
.status_code
, 422)
1562 self
.assertIn('contains unsupported characters', r
.json()['error'])
1564 def test_rrset_unknown_type(self
):
1565 name
, payload
, zone
= self
.create_zone()
1567 'changetype': 'replace',
1573 "content": "4.3.2.1",
1578 payload
= {'rrsets': [rrset
]}
1579 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1580 headers
={'content-type': 'application/json'})
1581 self
.assertEqual(r
.status_code
, 422)
1582 self
.assertIn('unknown type', r
.json()['error'])
1584 @parameterized.expand([
1587 def test_rrset_exclusive_and_other(self
, qtype
):
1588 name
, payload
, zone
= self
.create_zone()
1590 'changetype': 'replace',
1596 "content": "example.org.",
1601 payload
= {'rrsets': [rrset
]}
1602 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1603 headers
={'content-type': 'application/json'})
1604 self
.assertEqual(r
.status_code
, 422)
1605 self
.assertIn('Conflicts with pre-existing RRset', r
.json()['error'])
1607 @parameterized.expand([
1610 def test_rrset_other_and_exclusive(self
, qtype
):
1611 name
, payload
, zone
= self
.create_zone()
1613 'changetype': 'replace',
1614 'name': 'sub.'+name
,
1619 "content": "example.org.",
1624 payload
= {'rrsets': [rrset
]}
1625 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1626 headers
={'content-type': 'application/json'})
1627 self
.assert_success(r
)
1629 'changetype': 'replace',
1630 'name': 'sub.'+name
,
1635 "content": "1.2.3.4",
1640 payload
= {'rrsets': [rrset
]}
1641 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1642 headers
={'content-type': 'application/json'})
1643 self
.assertEqual(r
.status_code
, 422)
1644 self
.assertIn('Conflicts with pre-existing RRset', r
.json()['error'])
1646 @parameterized.expand([
1647 ('', 'SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
1648 ('sub.', 'CNAME', ['01.example.org.', '02.example.org.']),
1650 def test_rrset_single_qtypes(self
, label
, qtype
, contents
):
1651 name
, payload
, zone
= self
.create_zone()
1653 'changetype': 'replace',
1654 'name': label
+ name
,
1659 "content": contents
[0],
1663 "content": contents
[1],
1668 payload
= {'rrsets': [rrset
]}
1669 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1670 headers
={'content-type': 'application/json'})
1671 self
.assertEqual(r
.status_code
, 422)
1672 self
.assertIn('IN ' + qtype
+ ' has more than one record', r
.json()['error'])
1674 def test_rrset_zone_apex(self
):
1675 name
, payload
, zone
= self
.create_zone()
1677 'changetype': 'replace',
1683 "content": 'ns1.example.org. test@example.org. 10 10800 3600 604800 3600',
1689 'changetype': 'replace',
1695 "content": 'example.com.',
1701 payload
= {'rrsets': [rrset1
, rrset2
]}
1702 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1703 headers
={'content-type': 'application/json'})
1704 self
.assert_success(r
) # user should be able to create DNAME at APEX as per RFC 6672 section 2.3
1706 @parameterized.expand([
1707 ('SOA', 'ns1.example.org. test@example.org. 10 10800 3600 604800 1800'),
1708 ('DNSKEY', '257 3 8 AwEAAb/+pXOZWYQ8mv9WM5dFva8WU9jcIUdDuEjldbyfnkQ/xlrJC5zAEfhYhrea3SmIPmMTDimLqbh3/4SMTNPTUF+9+U1vpNfIRTFadqsmuU9Fddz3JqCcYwEpWbReg6DJOeyu+9oBoIQkPxFyLtIXEPGlQzrynKubn04Cx83I6NfzDTraJT3jLHKeW5PVc1ifqKzHz5TXdHHTA7NkJAa0sPcZCoNE1LpnJI/wcUpRUiuQhoLFeT1E432GuPuZ7y+agElGj0NnBxEgnHrhrnZWUbULpRa/il+Cr5Taj988HqX9Xdm6FjcP4Lbuds/44U7U8du224Q8jTrZ57Yvj4VDQKc='),
1710 def test_only_at_apex(self
, qtype
, content
):
1711 name
, payload
, zone
= self
.create_zone(soa_edit_api
='')
1713 'changetype': 'replace',
1724 payload
= {'rrsets': [rrset
]}
1725 r
= self
.session
.patch(
1726 self
.url("/api/v1/servers/localhost/zones/" + name
),
1727 data
=json
.dumps(payload
),
1728 headers
={'content-type': 'application/json'})
1729 self
.assert_success(r
)
1730 # verify that the new record is there
1731 data
= self
.get_zone(name
)
1732 self
.assertEqual(get_rrset(data
, name
, qtype
)['records'], rrset
['records'])
1735 'changetype': 'replace',
1736 'name': 'sub.' + name
,
1746 payload
= {'rrsets': [rrset
]}
1747 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1748 headers
={'content-type': 'application/json'})
1749 self
.assertEqual(r
.status_code
, 422)
1750 self
.assertIn('only allowed at apex', r
.json()['error'])
1751 data
= self
.get_zone(name
)
1752 self
.assertIsNone(get_rrset(data
, 'sub.' + name
, qtype
))
1754 @parameterized.expand([
1755 ('DS', '44030 8 2 d4c3d5552b8679faeebc317e5f048b614b2e5f607dc57f1553182d49ab2179f7'),
1757 def test_not_allowed_at_apex(self
, qtype
, content
):
1758 name
, payload
, zone
= self
.create_zone()
1760 'changetype': 'replace',
1771 payload
= {'rrsets': [rrset
]}
1772 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1773 headers
={'content-type': 'application/json'})
1774 self
.assertEqual(r
.status_code
, 422)
1775 self
.assertIn('not allowed at apex', r
.json()['error'])
1776 data
= self
.get_zone(name
)
1777 self
.assertIsNone(get_rrset(data
, 'sub.' + name
, qtype
))
1780 'changetype': 'replace',
1781 'name': 'sub.' + name
,
1791 payload
= {'rrsets': [rrset
]}
1792 r
= self
.session
.patch(
1793 self
.url("/api/v1/servers/localhost/zones/" + name
),
1794 data
=json
.dumps(payload
),
1795 headers
={'content-type': 'application/json'})
1796 self
.assert_success(r
)
1797 # verify that the new record is there
1798 data
= self
.get_zone(name
)
1799 self
.assertEqual(get_rrset(data
, 'sub.' + name
, qtype
)['records'], rrset
['records'])
1801 def test_rr_svcb(self
):
1802 name
, payload
, zone
= self
.create_zone()
1804 'changetype': 'replace',
1805 'name': 'svcb.' + name
,
1810 "content": '40 . mandatory=alpn alpn=h2,h3 ipv4hint=192.0.2.1,192.0.2.2 ech="dG90YWxseSBib2d1cyBlY2hjb25maWcgdmFsdWU="',
1815 payload
= {'rrsets': [rrset
]}
1816 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1817 headers
={'content-type': 'application/json'})
1818 self
.assert_success(r
)
1820 def test_rrset_ns_dname_exclude(self
):
1821 name
, payload
, zone
= self
.create_zone()
1823 'changetype': 'replace',
1824 'name': 'delegation.'+name
,
1829 "content": "ns.example.org.",
1834 payload
= {'rrsets': [rrset
]}
1835 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1836 headers
={'content-type': 'application/json'})
1837 self
.assert_success(r
)
1839 'changetype': 'replace',
1840 'name': 'delegation.'+name
,
1845 "content": "example.com.",
1850 payload
= {'rrsets': [rrset
]}
1851 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1852 headers
={'content-type': 'application/json'})
1853 self
.assertEqual(r
.status_code
, 422)
1854 self
.assertIn('Cannot have both NS and DNAME except in zone apex', r
.json()['error'])
1856 ## FIXME: Enable this when it's time for it
1857 # def test_rrset_dname_nothing_under(self):
1858 # name, payload, zone = self.create_zone()
1860 # 'changetype': 'replace',
1861 # 'name': 'delegation.'+name,
1866 # "content": "example.com.",
1871 # payload = {'rrsets': [rrset]}
1872 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1873 # headers={'content-type': 'application/json'})
1874 # self.assert_success(r)
1876 # 'changetype': 'replace',
1877 # 'name': 'sub.delegation.'+name,
1882 # "content": "1.2.3.4",
1887 # payload = {'rrsets': [rrset]}
1888 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1889 # headers={'content-type': 'application/json'})
1890 # self.assertEqual(r.status_code, 422)
1891 # self.assertIn('You cannot have record(s) under CNAME/DNAME', r.json()['error'])
1893 def test_create_zone_with_leading_space(self
):
1894 # Actual regression.
1895 name
, payload
, zone
= self
.create_zone()
1897 'changetype': 'replace',
1903 "content": " 4.3.2.1",
1908 payload
= {'rrsets': [rrset
]}
1909 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1910 headers
={'content-type': 'application/json'})
1911 self
.assertEqual(r
.status_code
, 422)
1912 self
.assertIn('Not in expected format', r
.json()['error'])
1914 @unittest.skipIf(is_auth_lmdb(), "No out-of-zone storage in LMDB")
1915 def test_zone_rr_delete_out_of_zone(self
):
1916 name
, payload
, zone
= self
.create_zone()
1918 'changetype': 'delete',
1919 'name': 'not-in-zone.',
1922 payload
= {'rrsets': [rrset
]}
1923 r
= self
.session
.patch(
1924 self
.url("/api/v1/servers/localhost/zones/" + name
),
1925 data
=json
.dumps(payload
),
1926 headers
={'content-type': 'application/json'})
1928 self
.assert_success(r
) # succeed so users can fix their wrong, old data
1930 def test_zone_delete(self
):
1931 name
, payload
, zone
= self
.create_zone()
1932 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
1933 self
.assertEqual(r
.status_code
, 204)
1934 self
.assertNotIn('Content-Type', r
.headers
)
1936 def test_zone_comment_create(self
):
1937 name
, payload
, zone
= self
.create_zone()
1939 'changetype': 'replace',
1946 'content': 'blah blah',
1950 'content': 'blah blah bleh',
1954 payload
= {'rrsets': [rrset
]}
1955 r
= self
.session
.patch(
1956 self
.url("/api/v1/servers/localhost/zones/" + name
),
1957 data
=json
.dumps(payload
),
1958 headers
={'content-type': 'application/json'})
1960 self
.assert_error_json(r
) # No comments in LMDB
1963 self
.assert_success(r
)
1964 # make sure the comments have been set, and that the NS
1965 # records are still present
1966 data
= self
.get_zone(name
)
1967 serverset
= get_rrset(data
, name
, 'NS')
1969 self
.assertNotEqual(serverset
['records'], [])
1970 self
.assertNotEqual(serverset
['comments'], [])
1971 # verify that modified_at has been set by pdns
1972 self
.assertNotEqual([c
for c
in serverset
['comments']][0]['modified_at'], 0)
1973 # verify that TTL is correct (regression test)
1974 self
.assertEqual(serverset
['ttl'], 3600)
1976 def test_zone_comment_delete(self
):
1977 # Test: Delete ONLY comments.
1978 name
, payload
, zone
= self
.create_zone()
1980 'changetype': 'replace',
1985 payload
= {'rrsets': [rrset
]}
1986 r
= self
.session
.patch(
1987 self
.url("/api/v1/servers/localhost/zones/" + name
),
1988 data
=json
.dumps(payload
),
1989 headers
={'content-type': 'application/json'})
1990 self
.assert_success(r
)
1991 # make sure the NS records are still present
1992 data
= self
.get_zone(name
)
1993 serverset
= get_rrset(data
, name
, 'NS')
1995 self
.assertNotEqual(serverset
['records'], [])
1996 self
.assertEqual(serverset
['comments'], [])
1998 @unittest.skipIf(is_auth_lmdb(), "No comments in LMDB")
1999 def test_zone_comment_out_of_range_modified_at(self
):
2000 # Test if comments on an rrset stay intact if the rrset is replaced
2001 name
, payload
, zone
= self
.create_zone()
2003 'changetype': 'replace',
2009 'content': 'oh hi there',
2010 'modified_at': '4294967297'
2014 payload
= {'rrsets': [rrset
]}
2015 r
= self
.session
.patch(
2016 self
.url("/api/v1/servers/localhost/zones/" + name
),
2017 data
=json
.dumps(payload
),
2018 headers
={'content-type': 'application/json'})
2019 self
.assertEqual(r
.status_code
, 422)
2020 self
.assertIn("Key 'modified_at' is out of range", r
.json()['error'])
2022 @unittest.skipIf(is_auth_lmdb(), "No comments in LMDB")
2023 def test_zone_comment_stay_intact(self
):
2024 # Test if comments on an rrset stay intact if the rrset is replaced
2025 name
, payload
, zone
= self
.create_zone()
2028 'changetype': 'replace',
2034 'content': 'oh hi there',
2039 payload
= {'rrsets': [rrset
]}
2040 r
= self
.session
.patch(
2041 self
.url("/api/v1/servers/localhost/zones/" + name
),
2042 data
=json
.dumps(payload
),
2043 headers
={'content-type': 'application/json'})
2044 self
.assert_success(r
)
2045 # replace rrset records
2047 'changetype': 'replace',
2053 "content": "ns1.bar.com.",
2058 payload2
= {'rrsets': [rrset2
]}
2059 r
= self
.session
.patch(
2060 self
.url("/api/v1/servers/localhost/zones/" + name
),
2061 data
=json
.dumps(payload2
),
2062 headers
={'content-type': 'application/json'})
2063 self
.assert_success(r
)
2064 # make sure the comments still exist
2065 data
= self
.get_zone(name
)
2066 serverset
= get_rrset(data
, name
, 'NS')
2068 self
.assertEqual(serverset
['records'], rrset2
['records'])
2069 self
.assertEqual(serverset
['comments'], rrset
['comments'])
2071 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2072 def test_search_rr_exact_zone(self
):
2073 name
= unique_zone_name()
2074 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
2075 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.')))
2076 self
.assert_success_json(r
)
2078 self
.assertCountEqual(r
.json(), [
2079 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
2080 {u
'content': u
'ns1.example.com.',
2081 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2082 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
2083 {u
'content': u
'ns2.example.com.',
2084 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2085 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
2086 {u
'content': u
'a.misconfigured.dns.server.invalid. hostmaster.'+name
+' 22 10800 3600 604800 3600',
2087 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2088 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
2091 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2092 def test_search_rr_exact_zone_filter_type_zone(self
):
2093 name
= unique_zone_name()
2095 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
2096 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.') + "&object_type=" + data_type
))
2097 self
.assert_success_json(r
)
2099 self
.assertEqual(r
.json(), [
2100 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
2103 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2104 def test_search_rr_exact_zone_filter_type_record(self
):
2105 name
= unique_zone_name()
2106 data_type
= "record"
2107 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
2108 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.') + "&object_type=" + data_type
))
2109 self
.assert_success_json(r
)
2111 self
.assertCountEqual(r
.json(), [
2112 {u
'content': u
'ns1.example.com.',
2113 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2114 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
2115 {u
'content': u
'ns2.example.com.',
2116 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2117 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
2118 {u
'content': u
'a.misconfigured.dns.server.invalid. hostmaster.'+name
+' 22 10800 3600 604800 3600',
2119 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
2120 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
2123 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2124 def test_search_rr_substring(self
):
2125 name
= unique_zone_name()
2127 self
.create_zone(name
=name
)
2128 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
2129 self
.assert_success_json(r
)
2131 # should return zone, SOA, ns1, ns2
2132 self
.assertEqual(len(r
.json()), 4)
2134 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2135 def test_search_rr_case_insensitive(self
):
2136 name
= unique_zone_name()+'testsuffix.'
2137 self
.create_zone(name
=name
)
2138 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
2139 self
.assert_success_json(r
)
2141 # should return zone, SOA, ns1, ns2
2142 self
.assertEqual(len(r
.json()), 4)
2144 @unittest.skipIf(is_auth_lmdb(), "No search or comments in LMDB")
2145 def test_search_rr_comment(self
):
2146 name
= unique_zone_name()
2152 "content": "2001:DB8::1",
2156 "account": "test AAAA",
2158 "modified_at": 11112,
2161 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=rrsets
)
2162 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=blah"))
2163 self
.assert_success_json(r
)
2165 # should return the AAAA record
2166 self
.assertEqual(len(data
), 1)
2167 self
.assertEqual(data
[0]['object_type'], 'comment')
2168 self
.assertEqual(data
[0]['type'], 'AAAA')
2169 self
.assertEqual(data
[0]['name'], name
)
2170 self
.assertEqual(data
[0]['content'], rrsets
[0]['comments'][0]['content'])
2172 @unittest.skipIf(is_auth_lmdb(), "No search in LMDB")
2173 def test_search_after_rectify_with_ent(self
):
2174 name
= unique_zone_name()
2175 search
= name
.split('.')[0]
2177 "name": 'sub.sub.' + name
,
2181 "content": "4.3.2.1",
2185 self
.create_zone(name
=name
, rrsets
=[rrset
])
2186 pdnsutil_rectify(name
)
2187 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
2188 self
.assert_success_json(r
)
2190 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
2191 self
.assertEqual(len(r
.json()), 5)
2193 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2194 def test_default_api_rectify_dnssec(self
):
2195 name
= unique_zone_name()
2198 "name": 'a.' + name
,
2202 "content": "2001:DB8::1",
2207 "name": 'b.' + name
,
2211 "content": "2001:DB8::2",
2216 self
.create_zone(name
=name
, rrsets
=rrsets
, dnssec
=True, nsec3param
='1 0 1 ab')
2217 dbrecs
= get_db_records(name
, 'AAAA')
2218 self
.assertIsNotNone(dbrecs
[0]['ordername'])
2220 def test_default_api_rectify_nodnssec(self
):
2221 """Without any DNSSEC settings, rectify should still add ENTs. Setup the zone
2222 so ENTs are necessary, and check for their existence using sdig.
2224 name
= unique_zone_name()
2227 "name": 'a.sub.' + name
,
2231 "content": "2001:DB8::1",
2236 "name": 'b.sub.' + name
,
2240 "content": "2001:DB8::2",
2245 self
.create_zone(name
=name
, rrsets
=rrsets
)
2246 # default-api-rectify is yes (by default). expect rectify to have happened.
2247 assert 'Rcode: 0 ' in sdig('sub.' + name
, 'TXT')
2249 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2250 def test_override_api_rectify(self
):
2251 name
= unique_zone_name()
2252 search
= name
.split('.')[0]
2255 "name": 'a.' + name
,
2259 "content": "2001:DB8::1",
2264 "name": 'b.' + name
,
2268 "content": "2001:DB8::2",
2273 self
.create_zone(name
=name
, rrsets
=rrsets
, api_rectify
=False, dnssec
=True, nsec3param
='1 0 1 ab')
2274 dbrecs
= get_db_records(name
, 'AAAA')
2275 self
.assertIsNone(dbrecs
[0]['ordername'])
2277 @unittest.skipIf(is_auth_lmdb(), "No get_db_records for LMDB")
2278 def test_explicit_rectify_success(self
):
2279 name
, _
, data
= self
.create_zone
= self
.create_zone(api_rectify
=False, dnssec
=True, nsec3param
='1 0 1 ab')
2280 dbrecs
= get_db_records(name
, 'SOA')
2281 self
.assertIsNone(dbrecs
[0]['ordername'])
2282 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/rectify"))
2283 self
.assertEqual(r
.status_code
, 200)
2284 dbrecs
= get_db_records(name
, 'SOA')
2285 self
.assertIsNotNone(dbrecs
[0]['ordername'])
2287 def test_explicit_rectify_slave(self
):
2288 # Some users want to move a zone to kind=Slave and then rectify, without a re-transfer.
2289 name
, _
, data
= self
.create_zone
= self
.create_zone(api_rectify
=False, dnssec
=True, nsec3param
='1 0 1 ab')
2290 self
.put_zone(data
['id'], {'kind': 'Slave'})
2291 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/rectify"))
2292 self
.assertEqual(r
.status_code
, 200)
2293 if not is_auth_lmdb():
2294 dbrecs
= get_db_records(name
, 'SOA')
2295 self
.assertIsNotNone(dbrecs
[0]['ordername'])
2297 def test_cname_at_ent_place(self
):
2298 name
, payload
, zone
= self
.create_zone(dnssec
=True, api_rectify
=True)
2300 'changetype': 'replace',
2301 'name': 'sub2.sub1.' + name
,
2305 'content': "4.3.2.1",
2309 payload
= {'rrsets': [rrset
]}
2310 r
= self
.session
.patch(
2311 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
2312 data
=json
.dumps(payload
),
2313 headers
={'content-type': 'application/json'})
2314 self
.assertEqual(r
.status_code
, 204)
2316 'changetype': 'replace',
2317 'name': 'sub1.' + name
,
2321 'content': "www.example.org.",
2325 payload
= {'rrsets': [rrset
]}
2326 r
= self
.session
.patch(
2327 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
2328 data
=json
.dumps(payload
),
2329 headers
={'content-type': 'application/json'})
2330 self
.assertEqual(r
.status_code
, 204)
2332 def test_rrset_parameter_post_false(self
):
2333 name
= unique_zone_name()
2337 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
2339 r
= self
.session
.post(
2340 self
.url("/api/v1/servers/localhost/zones?rrsets=false"),
2341 data
=json
.dumps(payload
),
2342 headers
={'content-type': 'application/json'})
2344 self
.assert_success_json(r
)
2345 self
.assertEqual(r
.status_code
, 201)
2346 self
.assertEqual(r
.json().get('rrsets'), None)
2348 def test_rrset_false_parameter(self
):
2349 name
= unique_zone_name()
2350 self
.create_zone(name
=name
, kind
='Native')
2351 data
= self
.get_zone(name
, rrsets
="false")
2352 self
.assertEqual(data
.get('rrsets'), None)
2354 def test_rrset_true_parameter(self
):
2355 name
= unique_zone_name()
2356 self
.create_zone(name
=name
, kind
='Native')
2357 data
= self
.get_zone(name
, rrsets
="true")
2358 self
.assertEqual(len(data
['rrsets']), 2)
2360 def test_wrong_rrset_parameter(self
):
2361 name
= unique_zone_name()
2362 self
.create_zone(name
=name
, kind
='Native')
2364 name
, rrsets
="foobar",
2365 expect_error
="'rrsets' request parameter value 'foobar' is not supported"
2368 def test_put_master_tsig_key_ids_non_existent(self
):
2369 name
= unique_zone_name()
2370 keyname
= unique_zone_name().split('.')[0]
2371 self
.create_zone(name
=name
, kind
='Native')
2373 'master_tsig_key_ids': [keyname
]
2375 self
.put_zone(name
, payload
, expect_error
='A TSIG key with the name')
2377 def test_put_slave_tsig_key_ids_non_existent(self
):
2378 name
= unique_zone_name()
2379 keyname
= unique_zone_name().split('.')[0]
2380 self
.create_zone(name
=name
, kind
='Native')
2382 'slave_tsig_key_ids': [keyname
]
2384 self
.put_zone(name
, payload
, expect_error
='A TSIG key with the name')
2386 def test_zone_replace_rrsets_basic(self
):
2387 """Basic test: all automatic modification is off, on replace the new rrsets are ingested as is."""
2388 name
, _
, _
= self
.create_zone(dnssec
=False, soa_edit
='', soa_edit_api
='')
2390 {'name': name
, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2391 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2392 {'name': 'www.' + name
, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2393 {'name': 'sub.' + name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}]},
2395 self
.put_zone(name
, {'rrsets': rrsets
})
2397 data
= self
.get_zone(name
)
2398 for rrset
in rrsets
:
2399 rrset
.setdefault('comments', [])
2400 for record
in rrset
['records']:
2401 record
.setdefault('disabled', False)
2402 assert_eq_rrsets(data
['rrsets'], rrsets
)
2404 def test_zone_replace_rrsets_dnssec(self
):
2405 """With dnssec: check automatic rectify is done"""
2406 name
, _
, _
= self
.create_zone(dnssec
=True)
2408 {'name': name
, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2409 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2410 {'name': 'www.' + name
, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2412 self
.put_zone(name
, {'rrsets': rrsets
})
2414 if not is_auth_lmdb():
2415 # lmdb: skip, no get_db_records implementations
2416 dbrecs
= get_db_records(name
, 'A')
2417 assert dbrecs
[0]['ordername'] is not None # default = rectify enabled
2419 def test_zone_replace_rrsets_with_soa_edit(self
):
2420 """SOA-EDIT was enabled before rrsets will be replaced"""
2421 name
, _
, _
= self
.create_zone(soa_edit
='INCEPTION-INCREMENT', soa_edit_api
='SOA-EDIT-INCREASE')
2423 {'name': name
, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2424 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2425 {'name': 'www.' + name
, 'type': 'A', 'ttl': 3600, 'records': [{'content': '192.0.2.1'}]},
2426 {'name': 'sub.' + name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}]},
2428 self
.put_zone(name
, {'rrsets': rrsets
})
2430 data
= self
.get_zone(name
)
2431 soa
= [rrset
['records'][0]['content'] for rrset
in data
['rrsets'] if rrset
['type'] == 'SOA'][0]
2432 assert int(soa
.split()[2]) > 1 # serial is larger than what we sent
2434 def test_zone_replace_rrsets_no_soa_primary(self
):
2435 """Replace all RRsets but supply no SOA. Should fail."""
2436 name
, _
, _
= self
.create_zone()
2438 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]}
2440 self
.put_zone(name
, {'rrsets': rrsets
}, expect_error
='Must give SOA record for zone when replacing all RR sets')
2442 @parameterized.expand([
2445 {'name': '$NAME$', 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2448 def test_zone_replace_rrsets_secondary(self
, expected_error
, rrsets
):
2450 Replace all RRsets in a SECONDARY zone.
2452 If no SOA is given, this should still succeed, also setting zone stale (but cannot assert this here).
2454 name
, _
, _
= self
.create_zone(kind
='Secondary', nameservers
=None, masters
=['127.0.0.2'])
2455 self
.put_zone(name
, {'rrsets': templated_rrsets(rrsets
, name
)}, expect_error
=expected_error
)
2457 @parameterized.expand([
2459 ("Modifying RRsets in Consumer zones is unsupported", [
2460 {'name': '$NAME$', 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2463 def test_zone_replace_rrsets_consumer(self
, expected_error
, rrsets
):
2464 name
, _
, _
= self
.create_zone(kind
='Consumer', nameservers
=None, masters
=['127.0.0.2'])
2465 self
.put_zone(name
, {'rrsets': templated_rrsets(rrsets
, name
)}, expect_error
=expected_error
)
2467 def test_zone_replace_rrsets_negative_ttl(self
):
2468 name
, _
, _
= self
.create_zone(dnssec
=False, soa_edit
='', soa_edit_api
='')
2470 {'name': name
, 'type': 'SOA', 'ttl': -1, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2472 self
.put_zone(name
, {'rrsets': rrsets
}, expect_error
="Key 'ttl' is not a positive Integer")
2474 @parameterized.expand([
2475 ("IN MX: non-hostname content", [{'name': '$NAME$', 'type': 'MX', 'ttl': 3600, 'records': [{"content": "10 mail@mx.example.org."}]}]),
2476 ("out of zone", [{'name': 'not-in-zone.', 'type': 'NS', 'ttl': 3600, 'records': [{"content": "ns1.example.org."}]}]),
2477 ("contains unsupported characters", [{'name': 'test:.$NAME$', 'type': 'NS', 'ttl': 3600, 'records': [{"content": "ns1.example.org."}]}]),
2478 ("unknown type", [{'name': '$NAME$', 'type': 'INVALID', 'ttl': 3600, 'records': [{"content": "192.0.2.1"}]}]),
2479 ("Conflicts with another RRset", [{'name': '$NAME$', 'type': 'CNAME', 'ttl': 3600, 'records': [{"content": "example.org."}]}]),
2481 def test_zone_replace_rrsets_invalid(self
, expected_error
, invalid_rrsets
):
2482 """Test validation of RRsets before replacing them"""
2483 name
, _
, _
= self
.create_zone(dnssec
=False, soa_edit
='', soa_edit_api
='')
2485 {'name': name
, 'type': 'SOA', 'ttl': 3600, 'records': [{'content': 'invalid. hostmaster.invalid. 1 10800 3600 604800 3600'}]},
2486 {'name': name
, 'type': 'NS', 'ttl': 3600, 'records': [{'content': 'ns1.example.org.'}, {'content': 'ns2.example.org.'}]},
2488 rrsets
= base_rrsets
+ templated_rrsets(invalid_rrsets
, name
)
2489 self
.put_zone(name
, {'rrsets': rrsets
}, expect_error
=expected_error
)
2492 @unittest.skipIf(not is_auth(), "Not applicable")
2493 class AuthRootZone(ApiTestCase
, AuthZonesHelperMixin
):
2496 super(AuthRootZone
, self
).setUp()
2497 # zone name is not unique, so delete the zone before each individual test.
2498 self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/=2E"))
2500 def test_create_zone(self
):
2501 name
, payload
, data
= self
.create_zone(name
='.', serial
=22, soa_edit_api
='')
2502 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
2503 self
.assertIn(k
, data
)
2505 self
.assertEqual(data
[k
], payload
[k
])
2506 # validate generated SOA
2507 rec
= get_first_rec(data
, '.', 'SOA')
2510 "a.misconfigured.dns.server.invalid. hostmaster. " + str(payload
['serial']) +
2511 " 10800 3600 604800 3600"
2513 # Regression test: verify zone list works
2514 zonelist
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones")).json()
2515 print("zonelist:", zonelist
)
2516 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
2517 # Also test that fetching the zone works.
2518 print("id:", data
['id'])
2519 self
.assertEqual(data
['id'], '=2E')
2520 data
= self
.get_zone(data
['id'])
2521 print("zone (fetched):", data
)
2522 for k
in ('name', 'kind'):
2523 self
.assertIn(k
, data
)
2524 self
.assertEqual(data
[k
], payload
[k
])
2525 self
.assertEqual(data
['rrsets'][0]['name'], '.')
2527 def test_update_zone(self
):
2528 name
, payload
, zone
= self
.create_zone(name
='.')
2530 # update, set as Master and enable SOA-EDIT-API
2533 'masters': ['192.0.2.1', '192.0.2.2'],
2534 'soa_edit_api': 'EPOCH',
2537 self
.put_zone(zone_id
, payload
)
2538 data
= self
.get_zone(zone_id
)
2539 for k
in payload
.keys():
2540 self
.assertIn(k
, data
)
2541 self
.assertEqual(data
[k
], payload
[k
])
2542 # update, back to Native and empty(off)
2548 self
.put_zone(zone_id
, payload
)
2549 data
= self
.get_zone(zone_id
)
2550 for k
in payload
.keys():
2551 self
.assertIn(k
, data
)
2552 self
.assertEqual(data
[k
], payload
[k
])
2555 @unittest.skipIf(not is_recursor(), "Not applicable")
2556 class RecursorZones(ApiTestCase
):
2558 def create_zone(self
, name
=None, kind
=None, rd
=False, servers
=None):
2560 name
= unique_zone_name()
2567 'recursion_desired': rd
2569 r
= self
.session
.post(
2570 self
.url("/api/v1/servers/localhost/zones"),
2571 data
=json
.dumps(payload
),
2572 headers
={'content-type': 'application/json'})
2573 self
.assert_success_json(r
)
2574 return payload
, r
.json()
2576 def test_create_auth_zone(self
):
2577 payload
, data
= self
.create_zone(kind
='Native')
2578 for k
in payload
.keys():
2579 self
.assertEqual(data
[k
], payload
[k
])
2581 def test_create_zone_no_name(self
):
2585 'servers': ['8.8.8.8'],
2586 'recursion_desired': False,
2589 r
= self
.session
.post(
2590 self
.url("/api/v1/servers/localhost/zones"),
2591 data
=json
.dumps(payload
),
2592 headers
={'content-type': 'application/json'})
2593 self
.assertEqual(r
.status_code
, 422)
2594 self
.assertIn('is not canonical', r
.json()['error'])
2596 def test_create_forwarded_zone(self
):
2597 payload
, data
= self
.create_zone(kind
='Forwarded', rd
=False, servers
=['8.8.8.8'])
2598 # return values are normalized
2599 payload
['servers'][0] += ':53'
2600 for k
in payload
.keys():
2601 self
.assertEqual(data
[k
], payload
[k
])
2603 def test_create_forwarded_rd_zone(self
):
2604 payload
, data
= self
.create_zone(name
='google.com.', kind
='Forwarded', rd
=True, servers
=['8.8.8.8'])
2605 # return values are normalized
2606 payload
['servers'][0] += ':53'
2607 for k
in payload
.keys():
2608 self
.assertEqual(data
[k
], payload
[k
])
2610 def test_create_auth_zone_with_symbols(self
):
2611 payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name(), kind
='Native')
2612 expected_id
= (payload
['name'].replace('/', '=2F'))
2613 for k
in payload
.keys():
2614 self
.assertEqual(data
[k
], payload
[k
])
2615 self
.assertEqual(data
['id'], expected_id
)
2617 def test_rename_auth_zone(self
):
2618 payload
, data
= self
.create_zone(kind
='Native')
2619 name
= payload
['name']
2622 'name': 'renamed-'+name
,
2624 'recursion_desired': False
2626 r
= self
.session
.put(
2627 self
.url("/api/v1/servers/localhost/zones/" + name
),
2628 data
=json
.dumps(payload
),
2629 headers
={'content-type': 'application/json'})
2630 self
.assert_success(r
)
2631 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + payload
['name'])).json()
2632 for k
in payload
.keys():
2633 self
.assertEqual(data
[k
], payload
[k
])
2635 def test_zone_delete(self
):
2636 payload
, zone
= self
.create_zone(kind
='Native')
2637 name
= payload
['name']
2638 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
2639 self
.assertEqual(r
.status_code
, 204)
2640 self
.assertNotIn('Content-Type', r
.headers
)
2642 def test_search_rr_exact_zone(self
):
2643 name
= unique_zone_name()
2644 self
.create_zone(name
=name
, kind
='Native')
2645 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
))
2646 self
.assert_success_json(r
)
2648 self
.assertEqual(r
.json(), [{u
'type': u
'zone', u
'name': name
, u
'zone_id': name
}])
2650 def test_search_rr_substring(self
):
2651 name
= 'search-rr-zone.name.'
2652 self
.create_zone(name
=name
, kind
='Native')
2653 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2654 self
.assert_success_json(r
)
2656 # should return zone, SOA
2657 self
.assertEqual(len(r
.json()), 2)
2659 @unittest.skipIf(not is_auth(), "Not applicable")
2660 class AuthZoneKeys(ApiTestCase
, AuthZonesHelperMixin
):
2662 def test_get_keys(self
):
2663 r
= self
.session
.get(
2664 self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2665 self
.assert_success_json(r
)
2667 self
.assertGreater(len(keys
), 0)
2669 key0
= deepcopy(keys
[0])
2673 u
'algorithm': u
'ECDSAP256SHA256',
2676 u
'type': u
'Cryptokey',
2681 self
.assertEqual(key0
, expected
)
2683 keydata
= keys
[0]['dnskey'].split()
2684 self
.assertEqual(len(keydata
), 4)
2686 def test_get_keys_with_cds(self
):
2687 payload_metadata
= {"type": "Metadata", "kind": "PUBLISH-CDS", "metadata": ["4"]}
2688 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./metadata"),
2689 data
=json
.dumps(payload_metadata
))
2691 self
.assertEqual(r
.status_code
, 201)
2692 self
.assertEqual(rdata
["metadata"], payload_metadata
["metadata"])
2694 r
= self
.session
.get(
2695 self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2696 self
.assert_success_json(r
)
2698 self
.assertGreater(len(keys
), 0)
2700 key0
= deepcopy(keys
[0])
2701 self
.assertEqual(len(key0
['cds']), 1)
2702 self
.assertIn(key0
['cds'][0], key0
['ds'])
2703 self
.assertEqual(key0
['cds'][0].split()[2], '4')
2708 u
'algorithm': u
'ECDSAP256SHA256',
2711 u
'type': u
'Cryptokey',
2716 self
.assertEqual(key0
, expected
)
2718 keydata
= keys
[0]['dnskey'].split()
2719 self
.assertEqual(len(keydata
), 4)
2721 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./metadata/PUBLISH-CDS"))
2722 self
.assertEqual(r
.status_code
, 204)