]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
1 from __future__
import print_function
5 from copy
import deepcopy
6 from pprint
import pprint
7 from test_helper
import ApiTestCase
, unique_zone_name
, is_auth
, is_recursor
, get_db_records
, pdnsutil_rectify
10 def get_rrset(data
, qname
, qtype
):
11 for rrset
in data
['rrsets']:
12 if rrset
['name'] == qname
and rrset
['type'] == qtype
:
17 def get_first_rec(data
, qname
, qtype
):
18 rrset
= get_rrset(data
, qname
, qtype
)
20 return rrset
['records'][0]
24 def eq_zone_rrsets(rrsets
, expected
):
27 for type_
, expected_records
in expected
.items():
29 data_got
[type_
] = set()
30 data_expected
[type_
] = set()
31 uses_name
= any(['name' in expected_record
for expected_record
in expected_records
])
32 # minify + convert received data
33 for rrset
in [rrset
for rrset
in rrsets
if rrset
['type'] == type_
]:
35 for r
in rrset
['records']:
36 data_got
[type_
].add((rrset
['name'] if uses_name
else '@', rrset
['type'], r
['content']))
37 # minify expected data
38 for r
in expected_records
:
39 data_expected
[type_
].add((r
['name'] if uses_name
else '@', type_
, r
['content']))
41 print("eq_zone_rrsets: got:")
43 print("eq_zone_rrsets: expected:")
46 assert data_got
== data_expected
, "%r != %r" % (data_got
, data_expected
)
49 class Zones(ApiTestCase
):
51 def test_list_zones(self
):
52 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
53 self
.assert_success_json(r
)
55 example_com
= [domain
for domain
in domains
if domain
['name'] in ('example.com', 'example.com.')]
56 self
.assertEquals(len(example_com
), 1)
57 example_com
= example_com
[0]
59 required_fields
= ['id', 'url', 'name', 'kind']
61 required_fields
= required_fields
+ ['masters', 'last_check', 'notified_serial', 'serial', 'account']
62 self
.assertNotEquals(example_com
['serial'], 0)
64 required_fields
= required_fields
+ ['recursion_desired', 'servers']
65 for field
in required_fields
:
66 self
.assertIn(field
, example_com
)
69 class AuthZonesHelperMixin(object):
70 def create_zone(self
, name
=None, **kwargs
):
72 name
= unique_zone_name()
76 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
78 for k
, v
in kwargs
.items():
83 print("sending", payload
)
84 r
= self
.session
.post(
85 self
.url("/api/v1/servers/localhost/zones"),
86 data
=json
.dumps(payload
),
87 headers
={'content-type': 'application/json'})
88 self
.assert_success_json(r
)
89 self
.assertEquals(r
.status_code
, 201)
92 return name
, payload
, reply
95 @unittest.skipIf(not is_auth(), "Not applicable")
96 class AuthZones(ApiTestCase
, AuthZonesHelperMixin
):
98 def test_create_zone(self
):
99 # soa_edit_api has a default, override with empty for this test
100 name
, payload
, data
= self
.create_zone(serial
=22, soa_edit_api
='')
101 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
102 self
.assertIn(k
, data
)
104 self
.assertEquals(data
[k
], payload
[k
])
105 # validate generated SOA
106 expected_soa
= "a.misconfigured.powerdns.server. hostmaster." + name
+ " " + \
107 str(payload
['serial']) + " 10800 3600 604800 3600"
109 get_first_rec(data
, name
, 'SOA')['content'],
112 # Because we had confusion about dots, check that the DB is without dots.
113 dbrecs
= get_db_records(name
, 'SOA')
114 self
.assertEqual(dbrecs
[0]['content'], expected_soa
.replace('. ', ' '))
116 def test_create_zone_with_soa_edit_api(self
):
117 # soa_edit_api wins over serial
118 name
, payload
, data
= self
.create_zone(soa_edit_api
='EPOCH', serial
=10)
119 for k
in ('soa_edit_api', ):
120 self
.assertIn(k
, data
)
122 self
.assertEquals(data
[k
], payload
[k
])
123 # generated EPOCH serial surely is > fixed serial we passed in
125 self
.assertGreater(data
['serial'], payload
['serial'])
126 soa_serial
= int(get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2])
127 self
.assertGreater(soa_serial
, payload
['serial'])
128 self
.assertEquals(soa_serial
, data
['serial'])
130 def test_create_zone_with_account(self
):
131 # soa_edit_api wins over serial
132 name
, payload
, data
= self
.create_zone(account
='anaccount', serial
=10)
134 for k
in ('account', ):
135 self
.assertIn(k
, data
)
137 self
.assertEquals(data
[k
], payload
[k
])
139 def test_create_zone_default_soa_edit_api(self
):
140 name
, payload
, data
= self
.create_zone()
142 self
.assertEquals(data
['soa_edit_api'], 'DEFAULT')
144 def test_create_zone_exists(self
):
145 name
, payload
, data
= self
.create_zone()
152 r
= self
.session
.post(
153 self
.url("/api/v1/servers/localhost/zones"),
154 data
=json
.dumps(payload
),
155 headers
={'content-type': 'application/json'})
156 self
.assertEquals(r
.status_code
, 409) # Conflict - already exists
158 def test_create_zone_with_soa_edit(self
):
159 name
, payload
, data
= self
.create_zone(soa_edit
='INCEPTION-INCREMENT', soa_edit_api
='SOA-EDIT-INCREASE')
161 self
.assertEquals(data
['soa_edit'], 'INCEPTION-INCREMENT')
162 self
.assertEquals(data
['soa_edit_api'], 'SOA-EDIT-INCREASE')
163 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
164 # These particular settings lead to the first serial set to YYYYMMDD01.
165 self
.assertEquals(soa_serial
[-2:], '01')
167 'changetype': 'replace',
173 "content": "127.0.0.1",
178 payload
= {'rrsets': [rrset
]}
180 self
.url("/api/v1/servers/localhost/zones/" + data
['id']),
181 data
=json
.dumps(payload
),
182 headers
={'content-type': 'application/json'})
183 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
185 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
186 self
.assertEquals(soa_serial
[-2:], '02')
188 def test_create_zone_with_records(self
):
189 name
= unique_zone_name()
195 "content": "4.3.2.1",
199 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
200 # check our record has appeared
201 self
.assertEquals(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
203 def test_create_zone_with_wildcard_records(self
):
204 name
= unique_zone_name()
210 "content": "4.3.2.1",
214 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
215 # check our record has appeared
216 self
.assertEquals(get_rrset(data
, rrset
['name'], 'A')['records'], rrset
['records'])
218 def test_create_zone_with_comments(self
):
219 name
= unique_zone_name()
223 "type": "soa", # test uppercasing of type, too.
226 "content": "blah blah",
227 "modified_at": 11112,
235 "content": "2001:DB8::1",
239 "account": "test AAAA",
240 "content": "blah blah AAAA",
241 "modified_at": 11112,
249 "content": "\"test TXT\"",
258 "content": "192.0.2.1",
263 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=rrsets
)
264 # NS records have been created
265 self
.assertEquals(len(data
['rrsets']), len(rrsets
) + 1)
266 # check our comment has appeared
267 self
.assertEquals(get_rrset(data
, name
, 'SOA')['comments'], rrsets
[0]['comments'])
268 self
.assertEquals(get_rrset(data
, name
, 'A')['comments'], [])
269 self
.assertEquals(get_rrset(data
, name
, 'TXT')['comments'], [])
270 self
.assertEquals(get_rrset(data
, name
, 'AAAA')['comments'], rrsets
[1]['comments'])
272 def test_create_zone_uncanonical_nameservers(self
):
273 name
= unique_zone_name()
277 'nameservers': ['uncanon.example.com']
280 r
= self
.session
.post(
281 self
.url("/api/v1/servers/localhost/zones"),
282 data
=json
.dumps(payload
),
283 headers
={'content-type': 'application/json'})
284 self
.assertEquals(r
.status_code
, 422)
285 self
.assertIn('Nameserver is not canonical', r
.json()['error'])
287 def test_create_auth_zone_no_name(self
):
288 name
= unique_zone_name()
294 r
= self
.session
.post(
295 self
.url("/api/v1/servers/localhost/zones"),
296 data
=json
.dumps(payload
),
297 headers
={'content-type': 'application/json'})
298 self
.assertEquals(r
.status_code
, 422)
299 self
.assertIn('is not canonical', r
.json()['error'])
301 def test_create_zone_with_custom_soa(self
):
302 name
= unique_zone_name()
303 content
= u
"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
306 "type": "soa", # test uppercasing of type, too.
313 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
], soa_edit_api
='')
314 self
.assertEquals(get_rrset(data
, name
, 'SOA')['records'], rrset
['records'])
315 dbrecs
= get_db_records(name
, 'SOA')
316 self
.assertEqual(dbrecs
[0]['content'], content
.replace('. ', ' '))
318 def test_create_zone_double_dot(self
):
319 name
= 'test..' + unique_zone_name()
323 'nameservers': ['ns1.example.com.']
326 r
= self
.session
.post(
327 self
.url("/api/v1/servers/localhost/zones"),
328 data
=json
.dumps(payload
),
329 headers
={'content-type': 'application/json'})
330 self
.assertEquals(r
.status_code
, 422)
331 self
.assertIn('Unable to parse DNS Name', r
.json()['error'])
333 def test_create_zone_restricted_chars(self
):
334 name
= 'test:' + unique_zone_name() # : isn't good as a name.
338 'nameservers': ['ns1.example.com']
341 r
= self
.session
.post(
342 self
.url("/api/v1/servers/localhost/zones"),
343 data
=json
.dumps(payload
),
344 headers
={'content-type': 'application/json'})
345 self
.assertEquals(r
.status_code
, 422)
346 self
.assertIn('contains unsupported characters', r
.json()['error'])
348 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self
):
349 name
= unique_zone_name()
355 "content": "ns2.example.com.",
362 'nameservers': ['ns1.example.com.'],
366 r
= self
.session
.post(
367 self
.url("/api/v1/servers/localhost/zones"),
368 data
=json
.dumps(payload
),
369 headers
={'content-type': 'application/json'})
370 self
.assertEquals(r
.status_code
, 422)
371 self
.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r
.json()['error'])
373 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self
):
374 name
= unique_zone_name()
376 "name": 'subzone.'+name
,
380 "content": "ns2.example.com.",
387 'nameservers': ['ns1.example.com.'],
391 r
= self
.session
.post(
392 self
.url("/api/v1/servers/localhost/zones"),
393 data
=json
.dumps(payload
),
394 headers
={'content-type': 'application/json'})
395 self
.assert_success_json(r
)
397 def test_create_zone_with_symbols(self
):
398 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
399 name
= payload
['name']
400 expected_id
= name
.replace('/', '=2F')
401 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
402 self
.assertIn(k
, data
)
404 self
.assertEquals(data
[k
], payload
[k
])
405 self
.assertEquals(data
['id'], expected_id
)
406 dbrecs
= get_db_records(name
, 'SOA')
407 self
.assertEqual(dbrecs
[0]['name'], name
.rstrip('.'))
409 def test_create_zone_with_nameservers_non_string(self
):
410 # ensure we don't crash
411 name
= unique_zone_name()
415 'nameservers': [{'a': 'ns1.example.com'}] # invalid
418 r
= self
.session
.post(
419 self
.url("/api/v1/servers/localhost/zones"),
420 data
=json
.dumps(payload
),
421 headers
={'content-type': 'application/json'})
422 self
.assertEquals(r
.status_code
, 422)
424 def test_create_zone_with_dnssec(self
):
426 Create a zone with "dnssec" set and see if a key was made.
428 name
= unique_zone_name()
429 name
, payload
, data
= self
.create_zone(dnssec
=True)
431 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
433 for k
in ('dnssec', ):
434 self
.assertIn(k
, data
)
436 self
.assertEquals(data
[k
], payload
[k
])
438 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
444 self
.assertEquals(r
.status_code
, 200)
445 self
.assertEquals(len(keys
), 1)
446 self
.assertEquals(keys
[0]['type'], 'Cryptokey')
447 self
.assertEquals(keys
[0]['active'], True)
448 self
.assertEquals(keys
[0]['keytype'], 'csk')
450 def test_create_zone_with_dnssec_disable_dnssec(self
):
452 Create a zone with "dnssec", then set "dnssec" to false and see if the
455 name
= unique_zone_name()
456 name
, payload
, data
= self
.create_zone(dnssec
=True)
458 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
459 data
=json
.dumps({'dnssec': False}))
460 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
464 self
.assertEquals(r
.status_code
, 200)
465 self
.assertEquals(zoneinfo
['dnssec'], False)
467 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
471 self
.assertEquals(r
.status_code
, 200)
472 self
.assertEquals(len(keys
), 0)
474 def test_create_zone_with_nsec3param(self
):
476 Create a zone with "nsec3param" set and see if the metadata was added.
478 name
= unique_zone_name()
479 nsec3param
= '1 0 500 aabbccddeeff'
480 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
)
482 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
484 for k
in ('dnssec', 'nsec3param'):
485 self
.assertIn(k
, data
)
487 self
.assertEquals(data
[k
], payload
[k
])
489 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3PARAM'))
495 self
.assertEquals(r
.status_code
, 200)
496 self
.assertEquals(len(data
['metadata']), 1)
497 self
.assertEquals(data
['kind'], 'NSEC3PARAM')
498 self
.assertEquals(data
['metadata'][0], nsec3param
)
500 def test_create_zone_with_nsec3narrow(self
):
502 Create a zone with "nsec3narrow" set and see if the metadata was added.
504 name
= unique_zone_name()
505 nsec3param
= '1 0 500 aabbccddeeff'
506 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
,
509 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
511 for k
in ('dnssec', 'nsec3param', 'nsec3narrow'):
512 self
.assertIn(k
, data
)
514 self
.assertEquals(data
[k
], payload
[k
])
516 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3NARROW'))
522 self
.assertEquals(r
.status_code
, 200)
523 self
.assertEquals(len(data
['metadata']), 1)
524 self
.assertEquals(data
['kind'], 'NSEC3NARROW')
525 self
.assertEquals(data
['metadata'][0], '1')
527 def test_create_zone_dnssec_serial(self
):
529 Create a zone set/unset "dnssec" and see if the serial was increased
532 name
= unique_zone_name()
533 name
, payload
, data
= self
.create_zone()
535 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
536 self
.assertEquals(soa_serial
[-2:], '01')
538 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
539 data
=json
.dumps({'dnssec': True}))
540 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
543 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
545 self
.assertEquals(r
.status_code
, 200)
546 self
.assertEquals(soa_serial
[-2:], '02')
548 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
549 data
=json
.dumps({'dnssec': False}))
550 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
553 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
555 self
.assertEquals(r
.status_code
, 200)
556 self
.assertEquals(soa_serial
[-2:], '03')
558 def test_zone_absolute_url(self
):
559 name
, payload
, data
= self
.create_zone()
560 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
563 self
.assertTrue(rdata
[0]['url'].startswith('/api/v'))
565 def test_create_zone_metadata(self
):
566 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
567 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
568 data
=json
.dumps(payload_metadata
))
570 self
.assertEquals(r
.status_code
, 201)
571 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
573 def test_create_zone_metadata_kind(self
):
574 payload_metadata
= {"metadata": ["127.0.0.2"]}
575 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
576 data
=json
.dumps(payload_metadata
))
578 self
.assertEquals(r
.status_code
, 200)
579 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
581 def test_create_protected_zone_metadata(self
):
582 # test whether it prevents modification of certain kinds
583 for k
in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
584 payload
= {"metadata": ["FOO", "BAR"]}
585 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k
),
586 data
=json
.dumps(payload
))
587 self
.assertEquals(r
.status_code
, 422)
589 def test_retrieve_zone_metadata(self
):
590 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
591 self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
592 data
=json
.dumps(payload_metadata
))
593 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"))
595 self
.assertEquals(r
.status_code
, 200)
596 self
.assertIn(payload_metadata
, rdata
)
598 def test_delete_zone_metadata(self
):
599 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
600 self
.assertEquals(r
.status_code
, 200)
601 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
603 self
.assertEquals(r
.status_code
, 200)
604 self
.assertEquals(rdata
["metadata"], [])
606 def test_create_external_zone_metadata(self
):
607 payload_metadata
= {"metadata": ["My very important message"]}
608 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
609 data
=json
.dumps(payload_metadata
))
610 self
.assertEquals(r
.status_code
, 200)
612 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
614 def test_create_metadata_in_non_existent_zone(self
):
615 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
616 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
617 data
=json
.dumps(payload_metadata
))
618 self
.assertEquals(r
.status_code
, 404)
619 # Note: errors should probably contain json (see #5988)
620 # self.assertIn('Could not find domain ', r.json()['error'])
622 def test_create_slave_zone(self
):
623 # Test that nameservers can be absent for slave zones.
624 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
625 for k
in ('name', 'masters', 'kind'):
626 self
.assertIn(k
, data
)
627 self
.assertEquals(data
[k
], payload
[k
])
628 print("payload:", payload
)
630 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
631 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
633 print("zonelist:", zonelist
)
634 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
635 # Also test that fetching the zone works.
636 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
638 print("zone (fetched):", data
)
639 for k
in ('name', 'masters', 'kind'):
640 self
.assertIn(k
, data
)
641 self
.assertEquals(data
[k
], payload
[k
])
642 self
.assertEqual(data
['serial'], 0)
643 self
.assertEqual(data
['rrsets'], [])
645 def test_find_zone_by_name(self
):
646 name
= 'foo/' + unique_zone_name()
647 name
, payload
, data
= self
.create_zone(name
=name
)
648 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones?zone=" + name
))
651 self
.assertEquals(data
[0]['name'], name
)
653 def test_delete_slave_zone(self
):
654 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
655 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
658 def test_retrieve_slave_zone(self
):
659 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
660 print("payload:", payload
)
662 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/axfr-retrieve"))
664 print("status for axfr-retrieve:", data
)
665 self
.assertEqual(data
['result'], u
'Added retrieval request for \'' + payload
['name'] +
666 '\' from master 127.0.0.2')
668 def test_notify_master_zone(self
):
669 name
, payload
, data
= self
.create_zone(kind
='Master')
670 print("payload:", payload
)
672 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/notify"))
674 print("status for notify:", data
)
675 self
.assertEqual(data
['result'], 'Notification queued')
677 def test_get_zone_with_symbols(self
):
678 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
679 name
= payload
['name']
680 zone_id
= (name
.replace('/', '=2F'))
681 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
))
683 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
684 self
.assertIn(k
, data
)
686 self
.assertEquals(data
[k
], payload
[k
])
688 def test_get_zone(self
):
689 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
691 example_com
= [domain
for domain
in domains
if domain
['name'] == u
'example.com.'][0]
692 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + example_com
['id']))
693 self
.assert_success_json(r
)
695 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
696 self
.assertIn(k
, data
)
697 self
.assertEquals(data
['name'], 'example.com.')
699 def test_import_zone_broken(self
):
701 payload
['zone'] = """
702 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
703 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
704 ;; WARNING: recursion requested but not available
706 ;; OPT PSEUDOSECTION:
707 ; EDNS: version: 0, flags:; udp: 1680
709 ;powerdns.com. IN SOA
712 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
713 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
714 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
715 powerdns-broken.com. 86400 IN A 82.94.213.34
716 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
717 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
718 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
720 payload
['name'] = 'powerdns-broken.com.'
721 payload
['kind'] = 'Master'
722 payload
['nameservers'] = []
723 r
= self
.session
.post(
724 self
.url("/api/v1/servers/localhost/zones"),
725 data
=json
.dumps(payload
),
726 headers
={'content-type': 'application/json'})
727 self
.assertEquals(r
.status_code
, 422)
729 def test_import_zone_axfr_outofzone(self
):
730 # Ensure we don't create out-of-zone records
731 name
= unique_zone_name()
733 payload
['zone'] = """
734 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
735 NAME 3600 IN NS powerdnssec2.ds9a.nl.
736 example.org. 3600 IN AAAA 2001:888:2000:1d::2
737 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
738 """.replace('NAME', name
)
739 payload
['name'] = name
740 payload
['kind'] = 'Master'
741 payload
['nameservers'] = []
742 r
= self
.session
.post(
743 self
.url("/api/v1/servers/localhost/zones"),
744 data
=json
.dumps(payload
),
745 headers
={'content-type': 'application/json'})
746 self
.assertEquals(r
.status_code
, 422)
747 self
.assertEqual(r
.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
749 def test_import_zone_axfr(self
):
751 payload
['zone'] = """
752 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
753 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
754 ;; WARNING: recursion requested but not available
756 ;; OPT PSEUDOSECTION:
757 ; EDNS: version: 0, flags:; udp: 1680
759 ;powerdns.com. IN SOA
762 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
763 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
764 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
765 powerdns.com. 86400 IN A 82.94.213.34
766 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
767 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
768 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
770 payload
['name'] = 'powerdns.com.'
771 payload
['kind'] = 'Master'
772 payload
['nameservers'] = []
773 payload
['soa_edit_api'] = '' # turn off so exact SOA comparison works.
774 r
= self
.session
.post(
775 self
.url("/api/v1/servers/localhost/zones"),
776 data
=json
.dumps(payload
),
777 headers
={'content-type': 'application/json'})
778 self
.assert_success_json(r
)
780 self
.assertIn('name', data
)
784 {'content': 'powerdnssec1.ds9a.nl.'},
785 {'content': 'powerdnssec2.ds9a.nl.'},
788 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
791 {'content': '0 xs.powerdns.com.'},
794 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
797 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
801 eq_zone_rrsets(data
['rrsets'], expected
)
803 # check content in DB is stored WITHOUT trailing dot.
804 dbrecs
= get_db_records(payload
['name'], 'NS')
805 dbrec
= next((dbrec
for dbrec
in dbrecs
if dbrec
['content'].startswith('powerdnssec1')))
806 self
.assertEqual(dbrec
['content'], 'powerdnssec1.ds9a.nl')
808 def test_import_zone_bind(self
):
810 payload
['zone'] = """
811 $TTL 86400 ; 24 hours could have been written as 24h or 1d
812 ; $TTL used for all RRs without explicit TTL value
814 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
821 IN NS ns1.example.org. ; in the domain
822 IN NS ns2.smokeyjoe.com. ; external to domain
823 IN MX 10 mail.another.com. ; external mail provider
824 ; server host definitions
825 ns1 IN A 192.168.0.1 ;name server definition
826 www IN A 192.168.0.2 ;web server definition
827 ftp IN CNAME www.example.org. ;ftp server definition
828 ; non server domain hosts
829 bill IN A 192.168.0.3
830 fred IN A 192.168.0.4
832 payload
['name'] = 'example.org.'
833 payload
['kind'] = 'Master'
834 payload
['nameservers'] = []
835 payload
['soa_edit_api'] = '' # turn off so exact SOA comparison works.
836 r
= self
.session
.post(
837 self
.url("/api/v1/servers/localhost/zones"),
838 data
=json
.dumps(payload
),
839 headers
={'content-type': 'application/json'})
840 self
.assert_success_json(r
)
842 self
.assertIn('name', data
)
846 {'content': 'ns1.example.org.'},
847 {'content': 'ns2.smokeyjoe.com.'},
850 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
853 {'content': '10 mail.another.com.'},
856 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
857 {'content': '192.168.0.2', 'name': 'www.example.org.'},
858 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
859 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
862 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
866 eq_zone_rrsets(data
['rrsets'], expected
)
868 def test_export_zone_json(self
):
869 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
871 r
= self
.session
.get(
872 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
873 headers
={'accept': 'application/json;q=0.9,*/*;q=0.8'}
875 self
.assert_success_json(r
)
877 self
.assertIn('zone', data
)
878 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
879 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
880 name
+ '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name
+
881 ' 0 10800 3600 604800 3600']
882 self
.assertEquals(data
['zone'].strip().split('\n'), expected_data
)
884 def test_export_zone_text(self
):
885 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
887 r
= self
.session
.get(
888 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
889 headers
={'accept': '*/*'}
891 data
= r
.text
.strip().split("\n")
892 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
893 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
894 name
+ '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name
+
895 ' 0 10800 3600 604800 3600']
896 self
.assertEquals(data
, expected_data
)
898 def test_update_zone(self
):
899 name
, payload
, zone
= self
.create_zone()
900 name
= payload
['name']
901 # update, set as Master and enable SOA-EDIT-API
904 'masters': ['192.0.2.1', '192.0.2.2'],
905 'soa_edit_api': 'EPOCH',
908 r
= self
.session
.put(
909 self
.url("/api/v1/servers/localhost/zones/" + name
),
910 data
=json
.dumps(payload
),
911 headers
={'content-type': 'application/json'})
912 self
.assert_success(r
)
913 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
914 for k
in payload
.keys():
915 self
.assertIn(k
, data
)
916 self
.assertEquals(data
[k
], payload
[k
])
917 # update, back to Native and empty(off)
923 r
= self
.session
.put(
924 self
.url("/api/v1/servers/localhost/zones/" + name
),
925 data
=json
.dumps(payload
),
926 headers
={'content-type': 'application/json'})
927 self
.assert_success(r
)
928 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
929 for k
in payload
.keys():
930 self
.assertIn(k
, data
)
931 self
.assertEquals(data
[k
], payload
[k
])
933 def test_zone_rr_update(self
):
934 name
, payload
, zone
= self
.create_zone()
935 # do a replace (= update)
937 'changetype': 'replace',
943 "content": "ns1.bar.com.",
947 "content": "ns2-disabled.bar.com.",
952 payload
= {'rrsets': [rrset
]}
953 r
= self
.session
.patch(
954 self
.url("/api/v1/servers/localhost/zones/" + name
),
955 data
=json
.dumps(payload
),
956 headers
={'content-type': 'application/json'})
957 self
.assert_success(r
)
958 # verify that (only) the new record is there
959 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
960 self
.assertEquals(get_rrset(data
, name
, 'NS')['records'], rrset
['records'])
962 def test_zone_rr_update_mx(self
):
963 # Important to test with MX records, as they have a priority field, which must end up in the content field.
964 name
, payload
, zone
= self
.create_zone()
965 # do a replace (= update)
967 'changetype': 'replace',
973 "content": "10 mail.example.org.",
978 payload
= {'rrsets': [rrset
]}
979 r
= self
.session
.patch(
980 self
.url("/api/v1/servers/localhost/zones/" + name
),
981 data
=json
.dumps(payload
),
982 headers
={'content-type': 'application/json'})
983 self
.assert_success(r
)
984 # verify that (only) the new record is there
985 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
986 self
.assertEquals(get_rrset(data
, name
, 'MX')['records'], rrset
['records'])
988 def test_zone_rr_update_invalid_mx(self
):
989 name
, payload
, zone
= self
.create_zone()
990 # do a replace (= update)
992 'changetype': 'replace',
998 "content": "10 mail@mx.example.org.",
1003 payload
= {'rrsets': [rrset
]}
1004 r
= self
.session
.patch(
1005 self
.url("/api/v1/servers/localhost/zones/" + name
),
1006 data
=json
.dumps(payload
),
1007 headers
={'content-type': 'application/json'})
1008 self
.assertEquals(r
.status_code
, 422)
1009 self
.assertIn('non-hostname content', r
.json()['error'])
1010 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1011 self
.assertIsNone(get_rrset(data
, name
, 'MX'))
1013 def test_zone_rr_update_opt(self
):
1014 name
, payload
, zone
= self
.create_zone()
1015 # do a replace (= update)
1017 'changetype': 'replace',
1028 payload
= {'rrsets': [rrset
]}
1029 r
= self
.session
.patch(
1030 self
.url("/api/v1/servers/localhost/zones/" + name
),
1031 data
=json
.dumps(payload
),
1032 headers
={'content-type': 'application/json'})
1033 self
.assertEquals(r
.status_code
, 422)
1034 self
.assertIn('OPT: invalid type given', r
.json()['error'])
1036 def test_zone_rr_update_multiple_rrsets(self
):
1037 name
, payload
, zone
= self
.create_zone()
1039 'changetype': 'replace',
1046 "content": "ns9999.example.com.",
1052 'changetype': 'replace',
1058 "content": "10 mx444.example.com.",
1063 payload
= {'rrsets': [rrset1
, rrset2
]}
1064 r
= self
.session
.patch(
1065 self
.url("/api/v1/servers/localhost/zones/" + name
),
1066 data
=json
.dumps(payload
),
1067 headers
={'content-type': 'application/json'})
1068 self
.assert_success(r
)
1069 # verify that all rrsets have been updated
1070 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1071 self
.assertEquals(get_rrset(data
, name
, 'NS')['records'], rrset1
['records'])
1072 self
.assertEquals(get_rrset(data
, name
, 'MX')['records'], rrset2
['records'])
1074 def test_zone_rr_update_duplicate_record(self
):
1075 name
, payload
, zone
= self
.create_zone()
1077 'changetype': 'replace',
1082 {"content": "ns9999.example.com.", "disabled": False},
1083 {"content": "ns9996.example.com.", "disabled": False},
1084 {"content": "ns9987.example.com.", "disabled": False},
1085 {"content": "ns9988.example.com.", "disabled": False},
1086 {"content": "ns9999.example.com.", "disabled": False},
1089 payload
= {'rrsets': [rrset
]}
1090 r
= self
.session
.patch(
1091 self
.url("/api/v1/servers/localhost/zones/" + name
),
1092 data
=json
.dumps(payload
),
1093 headers
={'content-type': 'application/json'})
1094 self
.assertEquals(r
.status_code
, 422)
1095 self
.assertIn('Duplicate record in RRset', r
.json()['error'])
1097 def test_zone_rr_update_duplicate_rrset(self
):
1098 name
, payload
, zone
= self
.create_zone()
1100 'changetype': 'replace',
1106 "content": "ns9999.example.com.",
1112 'changetype': 'replace',
1118 "content": "ns9998.example.com.",
1123 payload
= {'rrsets': [rrset1
, rrset2
]}
1124 r
= self
.session
.patch(
1125 self
.url("/api/v1/servers/localhost/zones/" + name
),
1126 data
=json
.dumps(payload
),
1127 headers
={'content-type': 'application/json'})
1128 self
.assertEquals(r
.status_code
, 422)
1129 self
.assertIn('Duplicate RRset', r
.json()['error'])
1131 def test_zone_rr_delete(self
):
1132 name
, payload
, zone
= self
.create_zone()
1133 # do a delete of all NS records (these are created with the zone)
1135 'changetype': 'delete',
1139 payload
= {'rrsets': [rrset
]}
1140 r
= self
.session
.patch(
1141 self
.url("/api/v1/servers/localhost/zones/" + name
),
1142 data
=json
.dumps(payload
),
1143 headers
={'content-type': 'application/json'})
1144 self
.assert_success(r
)
1145 # verify that the records are gone
1146 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1147 self
.assertIsNone(get_rrset(data
, name
, 'NS'))
1149 def test_zone_disable_reenable(self
):
1150 # This also tests that SOA-EDIT-API works.
1151 name
, payload
, zone
= self
.create_zone(soa_edit_api
='EPOCH')
1152 # disable zone by disabling SOA
1154 'changetype': 'replace',
1160 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1165 payload
= {'rrsets': [rrset
]}
1166 r
= self
.session
.patch(
1167 self
.url("/api/v1/servers/localhost/zones/" + name
),
1168 data
=json
.dumps(payload
),
1169 headers
={'content-type': 'application/json'})
1170 self
.assert_success(r
)
1171 # check SOA serial has been edited
1172 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1173 soa_serial1
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1174 self
.assertNotEquals(soa_serial1
, '1')
1175 # make sure domain is still in zone list (disabled SOA!)
1176 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
1178 self
.assertEquals(len([domain
for domain
in domains
if domain
['name'] == name
]), 1)
1179 # sleep 1sec to ensure the EPOCH value changes for the next request
1181 # verify that modifying it still works
1182 rrset
['records'][0]['disabled'] = False
1183 payload
= {'rrsets': [rrset
]}
1184 r
= self
.session
.patch(
1185 self
.url("/api/v1/servers/localhost/zones/" + name
),
1186 data
=json
.dumps(payload
),
1187 headers
={'content-type': 'application/json'})
1188 self
.assert_success(r
)
1189 # check SOA serial has been edited again
1190 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1191 soa_serial2
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1192 self
.assertNotEquals(soa_serial2
, '1')
1193 self
.assertNotEquals(soa_serial2
, soa_serial1
)
1195 def test_zone_rr_update_out_of_zone(self
):
1196 name
, payload
, zone
= self
.create_zone()
1197 # replace with qname mismatch
1199 'changetype': 'replace',
1200 'name': 'not-in-zone.',
1205 "content": "ns1.bar.com.",
1210 payload
= {'rrsets': [rrset
]}
1211 r
= self
.session
.patch(
1212 self
.url("/api/v1/servers/localhost/zones/" + name
),
1213 data
=json
.dumps(payload
),
1214 headers
={'content-type': 'application/json'})
1215 self
.assertEquals(r
.status_code
, 422)
1216 self
.assertIn('out of zone', r
.json()['error'])
1218 def test_zone_rr_update_restricted_chars(self
):
1219 name
, payload
, zone
= self
.create_zone()
1220 # replace with qname mismatch
1222 'changetype': 'replace',
1223 'name': 'test:' + name
,
1228 "content": "ns1.bar.com.",
1233 payload
= {'rrsets': [rrset
]}
1234 r
= self
.session
.patch(
1235 self
.url("/api/v1/servers/localhost/zones/" + name
),
1236 data
=json
.dumps(payload
),
1237 headers
={'content-type': 'application/json'})
1238 self
.assertEquals(r
.status_code
, 422)
1239 self
.assertIn('contains unsupported characters', r
.json()['error'])
1241 def test_rrset_unknown_type(self
):
1242 name
, payload
, zone
= self
.create_zone()
1244 'changetype': 'replace',
1250 "content": "4.3.2.1",
1255 payload
= {'rrsets': [rrset
]}
1256 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1257 headers
={'content-type': 'application/json'})
1258 self
.assertEquals(r
.status_code
, 422)
1259 self
.assertIn('unknown type', r
.json()['error'])
1261 def test_rrset_cname_and_other(self
):
1262 name
, payload
, zone
= self
.create_zone()
1264 'changetype': 'replace',
1270 "content": "example.org.",
1275 payload
= {'rrsets': [rrset
]}
1276 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1277 headers
={'content-type': 'application/json'})
1278 self
.assertEquals(r
.status_code
, 422)
1279 self
.assertIn('Conflicts with pre-existing non-CNAME RRset', r
.json()['error'])
1281 def test_rrset_other_and_cname(self
):
1282 name
, payload
, zone
= self
.create_zone()
1284 'changetype': 'replace',
1285 'name': 'sub.'+name
,
1290 "content": "example.org.",
1295 payload
= {'rrsets': [rrset
]}
1296 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1297 headers
={'content-type': 'application/json'})
1298 self
.assert_success(r
)
1300 'changetype': 'replace',
1301 'name': 'sub.'+name
,
1306 "content": "1.2.3.4",
1311 payload
= {'rrsets': [rrset
]}
1312 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1313 headers
={'content-type': 'application/json'})
1314 self
.assertEquals(r
.status_code
, 422)
1315 self
.assertIn('Conflicts with pre-existing CNAME RRset', r
.json()['error'])
1317 def test_rrset_multiple_cnames(self
):
1318 name
, payload
, zone
= self
.create_zone()
1320 'changetype': 'replace',
1321 'name': 'sub.'+name
,
1326 "content": "01.example.org.",
1330 "content": "02.example.org.",
1335 payload
= {'rrsets': [rrset
]}
1336 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1337 headers
={'content-type': 'application/json'})
1338 self
.assertEquals(r
.status_code
, 422)
1339 self
.assertIn('/CNAME has more than one record', r
.json()['error'])
1341 def test_create_zone_with_leading_space(self
):
1342 # Actual regression.
1343 name
, payload
, zone
= self
.create_zone()
1345 'changetype': 'replace',
1351 "content": " 4.3.2.1",
1356 payload
= {'rrsets': [rrset
]}
1357 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1358 headers
={'content-type': 'application/json'})
1359 self
.assertEquals(r
.status_code
, 422)
1360 self
.assertIn('Not in expected format', r
.json()['error'])
1362 def test_zone_rr_delete_out_of_zone(self
):
1363 name
, payload
, zone
= self
.create_zone()
1365 'changetype': 'delete',
1366 'name': 'not-in-zone.',
1369 payload
= {'rrsets': [rrset
]}
1370 r
= self
.session
.patch(
1371 self
.url("/api/v1/servers/localhost/zones/" + name
),
1372 data
=json
.dumps(payload
),
1373 headers
={'content-type': 'application/json'})
1375 self
.assert_success(r
) # succeed so users can fix their wrong, old data
1377 def test_zone_delete(self
):
1378 name
, payload
, zone
= self
.create_zone()
1379 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
1380 self
.assertEquals(r
.status_code
, 204)
1381 self
.assertNotIn('Content-Type', r
.headers
)
1383 def test_zone_comment_create(self
):
1384 name
, payload
, zone
= self
.create_zone()
1386 'changetype': 'replace',
1393 'content': 'blah blah',
1397 'content': 'blah blah bleh',
1401 payload
= {'rrsets': [rrset
]}
1402 r
= self
.session
.patch(
1403 self
.url("/api/v1/servers/localhost/zones/" + name
),
1404 data
=json
.dumps(payload
),
1405 headers
={'content-type': 'application/json'})
1406 self
.assert_success(r
)
1407 # make sure the comments have been set, and that the NS
1408 # records are still present
1409 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1410 serverset
= get_rrset(data
, name
, 'NS')
1412 self
.assertNotEquals(serverset
['records'], [])
1413 self
.assertNotEquals(serverset
['comments'], [])
1414 # verify that modified_at has been set by pdns
1415 self
.assertNotEquals([c
for c
in serverset
['comments']][0]['modified_at'], 0)
1416 # verify that TTL is correct (regression test)
1417 self
.assertEquals(serverset
['ttl'], 3600)
1419 def test_zone_comment_delete(self
):
1420 # Test: Delete ONLY comments.
1421 name
, payload
, zone
= self
.create_zone()
1423 'changetype': 'replace',
1428 payload
= {'rrsets': [rrset
]}
1429 r
= self
.session
.patch(
1430 self
.url("/api/v1/servers/localhost/zones/" + name
),
1431 data
=json
.dumps(payload
),
1432 headers
={'content-type': 'application/json'})
1433 self
.assert_success(r
)
1434 # make sure the NS records are still present
1435 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1436 serverset
= get_rrset(data
, name
, 'NS')
1438 self
.assertNotEquals(serverset
['records'], [])
1439 self
.assertEquals(serverset
['comments'], [])
1441 def test_zone_comment_stay_intact(self
):
1442 # Test if comments on an rrset stay intact if the rrset is replaced
1443 name
, payload
, zone
= self
.create_zone()
1446 'changetype': 'replace',
1452 'content': 'oh hi there',
1457 payload
= {'rrsets': [rrset
]}
1458 r
= self
.session
.patch(
1459 self
.url("/api/v1/servers/localhost/zones/" + name
),
1460 data
=json
.dumps(payload
),
1461 headers
={'content-type': 'application/json'})
1462 self
.assert_success(r
)
1463 # replace rrset records
1465 'changetype': 'replace',
1471 "content": "ns1.bar.com.",
1476 payload2
= {'rrsets': [rrset2
]}
1477 r
= self
.session
.patch(
1478 self
.url("/api/v1/servers/localhost/zones/" + name
),
1479 data
=json
.dumps(payload2
),
1480 headers
={'content-type': 'application/json'})
1481 self
.assert_success(r
)
1482 # make sure the comments still exist
1483 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1484 serverset
= get_rrset(data
, name
, 'NS')
1486 self
.assertEquals(serverset
['records'], rrset2
['records'])
1487 self
.assertEquals(serverset
['comments'], rrset
['comments'])
1489 def test_zone_auto_ptr_ipv4_create(self
):
1490 revzone
= '4.2.192.in-addr.arpa.'
1491 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1492 name
= unique_zone_name()
1498 "content": "192.2.4.44",
1503 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
1504 del rrset
['records'][0]['set-ptr']
1505 self
.assertEquals(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
1506 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1507 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1509 self
.assertEquals(revsets
, [{
1510 u
'name': u
'44.4.2.192.in-addr.arpa.',
1519 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1520 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1522 def test_zone_auto_ptr_ipv4_update(self
):
1523 revzone
= '0.2.192.in-addr.arpa.'
1524 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1525 name
, payload
, zone
= self
.create_zone()
1527 'changetype': 'replace',
1533 "content": '192.2.0.2',
1539 payload
= {'rrsets': [rrset
]}
1540 r
= self
.session
.patch(
1541 self
.url("/api/v1/servers/localhost/zones/" + name
),
1542 data
=json
.dumps(payload
),
1543 headers
={'content-type': 'application/json'})
1544 self
.assert_success(r
)
1545 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1546 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1548 self
.assertEquals(revsets
, [{
1549 u
'name': u
'2.0.2.192.in-addr.arpa.',
1558 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1559 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1561 def test_zone_auto_ptr_ipv6_update(self
):
1563 revzone
= '8.b.d.0.1.0.0.2.ip6.arpa.'
1564 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1565 name
, payload
, zone
= self
.create_zone()
1567 'changetype': 'replace',
1573 "content": '2001:DB8::bb:aa',
1579 payload
= {'rrsets': [rrset
]}
1580 r
= self
.session
.patch(
1581 self
.url("/api/v1/servers/localhost/zones/" + name
),
1582 data
=json
.dumps(payload
),
1583 headers
={'content-type': 'application/json'})
1584 self
.assert_success(r
)
1585 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1586 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1588 self
.assertEquals(revsets
, [{
1589 u
'name': u
'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1598 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1599 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1601 def test_search_rr_exact_zone(self
):
1602 name
= unique_zone_name()
1603 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
1604 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.')))
1605 self
.assert_success_json(r
)
1607 self
.assertEquals(r
.json(), [
1608 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
1609 {u
'content': u
'ns1.example.com.',
1610 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1611 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1612 {u
'content': u
'ns2.example.com.',
1613 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1614 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1615 {u
'content': u
'a.misconfigured.powerdns.server. hostmaster.'+name
+' 22 10800 3600 604800 3600',
1616 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1617 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
1620 def test_search_rr_exact_zone_filter_type_zone(self
):
1621 name
= unique_zone_name()
1623 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
1624 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.') + "&object_type=" + data_type
))
1625 self
.assert_success_json(r
)
1627 self
.assertEquals(r
.json(), [
1628 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
1631 def test_search_rr_exact_zone_filter_type_record(self
):
1632 name
= unique_zone_name()
1633 data_type
= "record"
1634 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
1635 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.') + "&object_type=" + data_type
))
1636 self
.assert_success_json(r
)
1638 self
.assertEquals(r
.json(), [
1639 {u
'content': u
'ns1.example.com.',
1640 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1641 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1642 {u
'content': u
'ns2.example.com.',
1643 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1644 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1645 {u
'content': u
'a.misconfigured.powerdns.server. hostmaster.'+name
+' 22 10800 3600 604800 3600',
1646 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1647 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
1650 def test_search_rr_substring(self
):
1651 name
= unique_zone_name()
1653 self
.create_zone(name
=name
)
1654 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
1655 self
.assert_success_json(r
)
1657 # should return zone, SOA, ns1, ns2
1658 self
.assertEquals(len(r
.json()), 4)
1660 def test_search_rr_case_insensitive(self
):
1661 name
= unique_zone_name()+'testsuffix.'
1662 self
.create_zone(name
=name
)
1663 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1664 self
.assert_success_json(r
)
1666 # should return zone, SOA, ns1, ns2
1667 self
.assertEquals(len(r
.json()), 4)
1669 def test_search_after_rectify_with_ent(self
):
1670 name
= unique_zone_name()
1671 search
= name
.split('.')[0]
1673 "name": 'sub.sub.' + name
,
1677 "content": "4.3.2.1",
1681 self
.create_zone(name
=name
, rrsets
=[rrset
])
1682 pdnsutil_rectify(name
)
1683 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
1684 self
.assert_success_json(r
)
1686 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1687 self
.assertEquals(len(r
.json()), 5)
1689 def test_default_api_rectify(self
):
1690 name
= unique_zone_name()
1691 search
= name
.split('.')[0]
1694 "name": 'a.' + name
,
1698 "content": "2001:DB8::1",
1703 "name": 'b.' + name
,
1707 "content": "2001:DB8::2",
1712 self
.create_zone(name
=name
, rrsets
=rrsets
, dnssec
=True, nsec3param
='1 0 1 ab')
1713 dbrecs
= get_db_records(name
, 'AAAA')
1714 self
.assertIsNotNone(dbrecs
[0]['ordername'])
1716 def test_override_api_rectify(self
):
1717 name
= unique_zone_name()
1718 search
= name
.split('.')[0]
1721 "name": 'a.' + name
,
1725 "content": "2001:DB8::1",
1730 "name": 'b.' + name
,
1734 "content": "2001:DB8::2",
1739 self
.create_zone(name
=name
, rrsets
=rrsets
, api_rectify
=False, dnssec
=True, nsec3param
='1 0 1 ab')
1740 dbrecs
= get_db_records(name
, 'AAAA')
1741 self
.assertIsNone(dbrecs
[0]['ordername'])
1743 def test_cname_at_ent_place(self
):
1744 name
, payload
, zone
= self
.create_zone(api_rectify
=True)
1746 'changetype': 'replace',
1747 'name': 'sub2.sub1.' + name
,
1751 'content': "4.3.2.1",
1755 payload
= {'rrsets': [rrset
]}
1756 r
= self
.session
.patch(
1757 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
1758 data
=json
.dumps(payload
),
1759 headers
={'content-type': 'application/json'})
1760 self
.assertEquals(r
.status_code
, 204)
1762 'changetype': 'replace',
1763 'name': 'sub1.' + name
,
1767 'content': "www.example.org.",
1771 payload
= {'rrsets': [rrset
]}
1772 r
= self
.session
.patch(
1773 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
1774 data
=json
.dumps(payload
),
1775 headers
={'content-type': 'application/json'})
1776 self
.assertEquals(r
.status_code
, 204)
1778 def test_rrset_parameter_post_false(self
):
1779 name
= unique_zone_name()
1783 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1785 r
= self
.session
.post(
1786 self
.url("/api/v1/servers/localhost/zones?rrsets=false"),
1787 data
=json
.dumps(payload
),
1788 headers
={'content-type': 'application/json'})
1790 self
.assert_success_json(r
)
1791 self
.assertEquals(r
.status_code
, 201)
1792 self
.assertEquals(r
.json().get('rrsets'), None)
1794 def test_rrset_false_parameter(self
):
1795 name
= unique_zone_name()
1796 self
.create_zone(name
=name
, kind
='Native')
1797 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=false"))
1798 self
.assert_success_json(r
)
1800 self
.assertEquals(r
.json().get('rrsets'), None)
1802 def test_rrset_true_parameter(self
):
1803 name
= unique_zone_name()
1804 self
.create_zone(name
=name
, kind
='Native')
1805 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=true"))
1806 self
.assert_success_json(r
)
1808 self
.assertEquals(len(r
.json().get('rrsets')), 2)
1810 def test_wrong_rrset_parameter(self
):
1811 name
= unique_zone_name()
1812 self
.create_zone(name
=name
, kind
='Native')
1813 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=foobar"))
1814 self
.assertEquals(r
.status_code
, 422)
1815 self
.assertIn("'rrsets' request parameter value 'foobar' is not supported", r
.json()['error'])
1817 def test_put_master_tsig_key_ids_non_existent(self
):
1818 name
= unique_zone_name()
1819 keyname
= unique_zone_name().split('.')[0]
1820 self
.create_zone(name
=name
, kind
='Native')
1822 'master_tsig_key_ids': [keyname
]
1824 r
= self
.session
.put(self
.url('/api/v1/servers/localhost/zones/' + name
),
1825 data
=json
.dumps(payload
),
1826 headers
={'content-type': 'application/json'})
1827 self
.assertEquals(r
.status_code
, 422)
1828 self
.assertIn('A TSIG key with the name', r
.json()['error'])
1830 def test_put_slave_tsig_key_ids_non_existent(self
):
1831 name
= unique_zone_name()
1832 keyname
= unique_zone_name().split('.')[0]
1833 self
.create_zone(name
=name
, kind
='Native')
1835 'slave_tsig_key_ids': [keyname
]
1837 r
= self
.session
.put(self
.url('/api/v1/servers/localhost/zones/' + name
),
1838 data
=json
.dumps(payload
),
1839 headers
={'content-type': 'application/json'})
1840 self
.assertEquals(r
.status_code
, 422)
1841 self
.assertIn('A TSIG key with the name', r
.json()['error'])
1844 @unittest.skipIf(not is_auth(), "Not applicable")
1845 class AuthRootZone(ApiTestCase
, AuthZonesHelperMixin
):
1848 super(AuthRootZone
, self
).setUp()
1849 # zone name is not unique, so delete the zone before each individual test.
1850 self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/=2E"))
1852 def test_create_zone(self
):
1853 name
, payload
, data
= self
.create_zone(name
='.', serial
=22, soa_edit_api
='')
1854 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
1855 self
.assertIn(k
, data
)
1857 self
.assertEquals(data
[k
], payload
[k
])
1858 # validate generated SOA
1859 rec
= get_first_rec(data
, '.', 'SOA')
1862 "a.misconfigured.powerdns.server. hostmaster. " + str(payload
['serial']) +
1863 " 10800 3600 604800 3600"
1865 # Regression test: verify zone list works
1866 zonelist
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones")).json()
1867 print("zonelist:", zonelist
)
1868 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
1869 # Also test that fetching the zone works.
1870 print("id:", data
['id'])
1871 self
.assertEquals(data
['id'], '=2E')
1872 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id'])).json()
1873 print("zone (fetched):", data
)
1874 for k
in ('name', 'kind'):
1875 self
.assertIn(k
, data
)
1876 self
.assertEquals(data
[k
], payload
[k
])
1877 self
.assertEqual(data
['rrsets'][0]['name'], '.')
1879 def test_update_zone(self
):
1880 name
, payload
, zone
= self
.create_zone(name
='.')
1882 # update, set as Master and enable SOA-EDIT-API
1885 'masters': ['192.0.2.1', '192.0.2.2'],
1886 'soa_edit_api': 'EPOCH',
1889 r
= self
.session
.put(
1890 self
.url("/api/v1/servers/localhost/zones/" + zone_id
),
1891 data
=json
.dumps(payload
),
1892 headers
={'content-type': 'application/json'})
1893 self
.assert_success(r
)
1894 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
)).json()
1895 for k
in payload
.keys():
1896 self
.assertIn(k
, data
)
1897 self
.assertEquals(data
[k
], payload
[k
])
1898 # update, back to Native and empty(off)
1904 r
= self
.session
.put(
1905 self
.url("/api/v1/servers/localhost/zones/" + zone_id
),
1906 data
=json
.dumps(payload
),
1907 headers
={'content-type': 'application/json'})
1908 self
.assert_success(r
)
1909 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
)).json()
1910 for k
in payload
.keys():
1911 self
.assertIn(k
, data
)
1912 self
.assertEquals(data
[k
], payload
[k
])
1915 @unittest.skipIf(not is_recursor(), "Not applicable")
1916 class RecursorZones(ApiTestCase
):
1918 def create_zone(self
, name
=None, kind
=None, rd
=False, servers
=None):
1920 name
= unique_zone_name()
1927 'recursion_desired': rd
1929 r
= self
.session
.post(
1930 self
.url("/api/v1/servers/localhost/zones"),
1931 data
=json
.dumps(payload
),
1932 headers
={'content-type': 'application/json'})
1933 self
.assert_success_json(r
)
1934 return payload
, r
.json()
1936 def test_create_auth_zone(self
):
1937 payload
, data
= self
.create_zone(kind
='Native')
1938 for k
in payload
.keys():
1939 self
.assertEquals(data
[k
], payload
[k
])
1941 def test_create_zone_no_name(self
):
1945 'servers': ['8.8.8.8'],
1946 'recursion_desired': False,
1949 r
= self
.session
.post(
1950 self
.url("/api/v1/servers/localhost/zones"),
1951 data
=json
.dumps(payload
),
1952 headers
={'content-type': 'application/json'})
1953 self
.assertEquals(r
.status_code
, 422)
1954 self
.assertIn('is not canonical', r
.json()['error'])
1956 def test_create_forwarded_zone(self
):
1957 payload
, data
= self
.create_zone(kind
='Forwarded', rd
=False, servers
=['8.8.8.8'])
1958 # return values are normalized
1959 payload
['servers'][0] += ':53'
1960 for k
in payload
.keys():
1961 self
.assertEquals(data
[k
], payload
[k
])
1963 def test_create_forwarded_rd_zone(self
):
1964 payload
, data
= self
.create_zone(name
='google.com.', kind
='Forwarded', rd
=True, servers
=['8.8.8.8'])
1965 # return values are normalized
1966 payload
['servers'][0] += ':53'
1967 for k
in payload
.keys():
1968 self
.assertEquals(data
[k
], payload
[k
])
1970 def test_create_auth_zone_with_symbols(self
):
1971 payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name(), kind
='Native')
1972 expected_id
= (payload
['name'].replace('/', '=2F'))
1973 for k
in payload
.keys():
1974 self
.assertEquals(data
[k
], payload
[k
])
1975 self
.assertEquals(data
['id'], expected_id
)
1977 def test_rename_auth_zone(self
):
1978 payload
, data
= self
.create_zone(kind
='Native')
1979 name
= payload
['name']
1982 'name': 'renamed-'+name
,
1984 'recursion_desired': False
1986 r
= self
.session
.put(
1987 self
.url("/api/v1/servers/localhost/zones/" + name
),
1988 data
=json
.dumps(payload
),
1989 headers
={'content-type': 'application/json'})
1990 self
.assert_success(r
)
1991 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + payload
['name'])).json()
1992 for k
in payload
.keys():
1993 self
.assertEquals(data
[k
], payload
[k
])
1995 def test_zone_delete(self
):
1996 payload
, zone
= self
.create_zone(kind
='Native')
1997 name
= payload
['name']
1998 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
1999 self
.assertEquals(r
.status_code
, 204)
2000 self
.assertNotIn('Content-Type', r
.headers
)
2002 def test_search_rr_exact_zone(self
):
2003 name
= unique_zone_name()
2004 self
.create_zone(name
=name
, kind
='Native')
2005 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
))
2006 self
.assert_success_json(r
)
2008 self
.assertEquals(r
.json(), [{u
'type': u
'zone', u
'name': name
, u
'zone_id': name
}])
2010 def test_search_rr_substring(self
):
2011 name
= 'search-rr-zone.name.'
2012 self
.create_zone(name
=name
, kind
='Native')
2013 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2014 self
.assert_success_json(r
)
2016 # should return zone, SOA
2017 self
.assertEquals(len(r
.json()), 2)
2019 @unittest.skipIf(not is_auth(), "Not applicable")
2020 class AuthZoneKeys(ApiTestCase
, AuthZonesHelperMixin
):
2022 def test_get_keys(self
):
2023 r
= self
.session
.get(
2024 self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2025 self
.assert_success_json(r
)
2027 self
.assertGreater(len(keys
), 0)
2029 key0
= deepcopy(keys
[0])
2033 u
'algorithm': u
'ECDSAP256SHA256',
2036 u
'type': u
'Cryptokey',
2040 self
.assertEquals(key0
, expected
)
2042 keydata
= keys
[0]['dnskey'].split()
2043 self
.assertEqual(len(keydata
), 4)