]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
1 from __future__
import print_function
5 from copy
import deepcopy
6 from pprint
import pprint
7 from test_helper
import ApiTestCase
, unique_zone_name
, is_auth
, is_recursor
, get_db_records
, pdnsutil_rectify
10 def get_rrset(data
, qname
, qtype
):
11 for rrset
in data
['rrsets']:
12 if rrset
['name'] == qname
and rrset
['type'] == qtype
:
17 def get_first_rec(data
, qname
, qtype
):
18 rrset
= get_rrset(data
, qname
, qtype
)
20 return rrset
['records'][0]
24 def eq_zone_rrsets(rrsets
, expected
):
27 for type_
, expected_records
in expected
.items():
29 data_got
[type_
] = set()
30 data_expected
[type_
] = set()
31 uses_name
= any(['name' in expected_record
for expected_record
in expected_records
])
32 # minify + convert received data
33 for rrset
in [rrset
for rrset
in rrsets
if rrset
['type'] == type_
]:
35 for r
in rrset
['records']:
36 data_got
[type_
].add((rrset
['name'] if uses_name
else '@', rrset
['type'], r
['content']))
37 # minify expected data
38 for r
in expected_records
:
39 data_expected
[type_
].add((r
['name'] if uses_name
else '@', type_
, r
['content']))
41 print("eq_zone_rrsets: got:")
43 print("eq_zone_rrsets: expected:")
46 assert data_got
== data_expected
, "%r != %r" % (data_got
, data_expected
)
49 class Zones(ApiTestCase
):
51 def test_list_zones(self
):
52 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
53 self
.assert_success_json(r
)
55 example_com
= [domain
for domain
in domains
if domain
['name'] in ('example.com', 'example.com.')]
56 self
.assertEquals(len(example_com
), 1)
57 example_com
= example_com
[0]
59 required_fields
= ['id', 'url', 'name', 'kind']
61 required_fields
= required_fields
+ ['masters', 'last_check', 'notified_serial', 'serial', 'account']
62 self
.assertNotEquals(example_com
['serial'], 0)
64 required_fields
= required_fields
+ ['recursion_desired', 'servers']
65 for field
in required_fields
:
66 self
.assertIn(field
, example_com
)
69 class AuthZonesHelperMixin(object):
70 def create_zone(self
, name
=None, **kwargs
):
72 name
= unique_zone_name()
76 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
78 for k
, v
in kwargs
.items():
83 print("sending", payload
)
84 r
= self
.session
.post(
85 self
.url("/api/v1/servers/localhost/zones"),
86 data
=json
.dumps(payload
),
87 headers
={'content-type': 'application/json'})
88 self
.assert_success_json(r
)
89 self
.assertEquals(r
.status_code
, 201)
92 return name
, payload
, reply
95 @unittest.skipIf(not is_auth(), "Not applicable")
96 class AuthZones(ApiTestCase
, AuthZonesHelperMixin
):
98 def test_create_zone(self
):
99 # soa_edit_api has a default, override with empty for this test
100 name
, payload
, data
= self
.create_zone(serial
=22, soa_edit_api
='')
101 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
102 self
.assertIn(k
, data
)
104 self
.assertEquals(data
[k
], payload
[k
])
105 # validate generated SOA
106 expected_soa
= "a.misconfigured.powerdns.server. hostmaster." + name
+ " " + \
107 str(payload
['serial']) + " 10800 3600 604800 3600"
109 get_first_rec(data
, name
, 'SOA')['content'],
112 # Because we had confusion about dots, check that the DB is without dots.
113 dbrecs
= get_db_records(name
, 'SOA')
114 self
.assertEqual(dbrecs
[0]['content'], expected_soa
.replace('. ', ' '))
116 def test_create_zone_with_soa_edit_api(self
):
117 # soa_edit_api wins over serial
118 name
, payload
, data
= self
.create_zone(soa_edit_api
='EPOCH', serial
=10)
119 for k
in ('soa_edit_api', ):
120 self
.assertIn(k
, data
)
122 self
.assertEquals(data
[k
], payload
[k
])
123 # generated EPOCH serial surely is > fixed serial we passed in
125 self
.assertGreater(data
['serial'], payload
['serial'])
126 soa_serial
= int(get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2])
127 self
.assertGreater(soa_serial
, payload
['serial'])
128 self
.assertEquals(soa_serial
, data
['serial'])
130 def test_create_zone_with_account(self
):
131 # soa_edit_api wins over serial
132 name
, payload
, data
= self
.create_zone(account
='anaccount', serial
=10)
134 for k
in ('account', ):
135 self
.assertIn(k
, data
)
137 self
.assertEquals(data
[k
], payload
[k
])
139 def test_create_zone_default_soa_edit_api(self
):
140 name
, payload
, data
= self
.create_zone()
142 self
.assertEquals(data
['soa_edit_api'], 'DEFAULT')
144 def test_create_zone_exists(self
):
145 name
, payload
, data
= self
.create_zone()
152 r
= self
.session
.post(
153 self
.url("/api/v1/servers/localhost/zones"),
154 data
=json
.dumps(payload
),
155 headers
={'content-type': 'application/json'})
156 self
.assertEquals(r
.status_code
, 409) # Conflict - already exists
158 def test_create_zone_with_soa_edit(self
):
159 name
, payload
, data
= self
.create_zone(soa_edit
='INCEPTION-INCREMENT', soa_edit_api
='SOA-EDIT-INCREASE')
161 self
.assertEquals(data
['soa_edit'], 'INCEPTION-INCREMENT')
162 self
.assertEquals(data
['soa_edit_api'], 'SOA-EDIT-INCREASE')
163 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
164 # These particular settings lead to the first serial set to YYYYMMDD01.
165 self
.assertEquals(soa_serial
[-2:], '01')
167 'changetype': 'replace',
173 "content": "127.0.0.1",
178 payload
= {'rrsets': [rrset
]}
180 self
.url("/api/v1/servers/localhost/zones/" + data
['id']),
181 data
=json
.dumps(payload
),
182 headers
={'content-type': 'application/json'})
183 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
185 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
186 self
.assertEquals(soa_serial
[-2:], '02')
188 def test_create_zone_with_records(self
):
189 name
= unique_zone_name()
195 "content": "4.3.2.1",
199 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
200 # check our record has appeared
201 self
.assertEquals(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
203 def test_create_zone_with_wildcard_records(self
):
204 name
= unique_zone_name()
210 "content": "4.3.2.1",
214 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
215 # check our record has appeared
216 self
.assertEquals(get_rrset(data
, rrset
['name'], 'A')['records'], rrset
['records'])
218 def test_create_zone_with_comments(self
):
219 name
= unique_zone_name()
223 "type": "soa", # test uppercasing of type, too.
226 "content": "blah blah",
227 "modified_at": 11112,
235 "content": "2001:DB8::1",
239 "account": "test AAAA",
240 "content": "blah blah AAAA",
241 "modified_at": 11112,
249 "content": "\"test TXT\"",
258 "content": "192.0.2.1",
263 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=rrsets
)
264 # NS records have been created
265 self
.assertEquals(len(data
['rrsets']), len(rrsets
) + 1)
266 # check our comment has appeared
267 self
.assertEquals(get_rrset(data
, name
, 'SOA')['comments'], rrsets
[0]['comments'])
268 self
.assertEquals(get_rrset(data
, name
, 'A')['comments'], [])
269 self
.assertEquals(get_rrset(data
, name
, 'TXT')['comments'], [])
270 self
.assertEquals(get_rrset(data
, name
, 'AAAA')['comments'], rrsets
[1]['comments'])
272 def test_create_zone_uncanonical_nameservers(self
):
273 name
= unique_zone_name()
277 'nameservers': ['uncanon.example.com']
280 r
= self
.session
.post(
281 self
.url("/api/v1/servers/localhost/zones"),
282 data
=json
.dumps(payload
),
283 headers
={'content-type': 'application/json'})
284 self
.assertEquals(r
.status_code
, 422)
285 self
.assertIn('Nameserver is not canonical', r
.json()['error'])
287 def test_create_auth_zone_no_name(self
):
288 name
= unique_zone_name()
294 r
= self
.session
.post(
295 self
.url("/api/v1/servers/localhost/zones"),
296 data
=json
.dumps(payload
),
297 headers
={'content-type': 'application/json'})
298 self
.assertEquals(r
.status_code
, 422)
299 self
.assertIn('is not canonical', r
.json()['error'])
301 def test_create_zone_with_custom_soa(self
):
302 name
= unique_zone_name()
303 content
= u
"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
306 "type": "soa", # test uppercasing of type, too.
313 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
], soa_edit_api
='')
314 self
.assertEquals(get_rrset(data
, name
, 'SOA')['records'], rrset
['records'])
315 dbrecs
= get_db_records(name
, 'SOA')
316 self
.assertEqual(dbrecs
[0]['content'], content
.replace('. ', ' '))
318 def test_create_zone_double_dot(self
):
319 name
= 'test..' + unique_zone_name()
323 'nameservers': ['ns1.example.com.']
326 r
= self
.session
.post(
327 self
.url("/api/v1/servers/localhost/zones"),
328 data
=json
.dumps(payload
),
329 headers
={'content-type': 'application/json'})
330 self
.assertEquals(r
.status_code
, 422)
331 self
.assertIn('Unable to parse DNS Name', r
.json()['error'])
333 def test_create_zone_restricted_chars(self
):
334 name
= 'test:' + unique_zone_name() # : isn't good as a name.
338 'nameservers': ['ns1.example.com']
341 r
= self
.session
.post(
342 self
.url("/api/v1/servers/localhost/zones"),
343 data
=json
.dumps(payload
),
344 headers
={'content-type': 'application/json'})
345 self
.assertEquals(r
.status_code
, 422)
346 self
.assertIn('contains unsupported characters', r
.json()['error'])
348 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self
):
349 name
= unique_zone_name()
355 "content": "ns2.example.com.",
362 'nameservers': ['ns1.example.com.'],
366 r
= self
.session
.post(
367 self
.url("/api/v1/servers/localhost/zones"),
368 data
=json
.dumps(payload
),
369 headers
={'content-type': 'application/json'})
370 self
.assertEquals(r
.status_code
, 422)
371 self
.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r
.json()['error'])
373 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self
):
374 name
= unique_zone_name()
376 "name": 'subzone.'+name
,
380 "content": "ns2.example.com.",
387 'nameservers': ['ns1.example.com.'],
391 r
= self
.session
.post(
392 self
.url("/api/v1/servers/localhost/zones"),
393 data
=json
.dumps(payload
),
394 headers
={'content-type': 'application/json'})
395 self
.assert_success_json(r
)
397 def test_create_zone_with_symbols(self
):
398 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
399 name
= payload
['name']
400 expected_id
= name
.replace('/', '=2F')
401 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
402 self
.assertIn(k
, data
)
404 self
.assertEquals(data
[k
], payload
[k
])
405 self
.assertEquals(data
['id'], expected_id
)
406 dbrecs
= get_db_records(name
, 'SOA')
407 self
.assertEqual(dbrecs
[0]['name'], name
.rstrip('.'))
409 def test_create_zone_with_nameservers_non_string(self
):
410 # ensure we don't crash
411 name
= unique_zone_name()
415 'nameservers': [{'a': 'ns1.example.com'}] # invalid
418 r
= self
.session
.post(
419 self
.url("/api/v1/servers/localhost/zones"),
420 data
=json
.dumps(payload
),
421 headers
={'content-type': 'application/json'})
422 self
.assertEquals(r
.status_code
, 422)
424 def test_create_zone_with_dnssec(self
):
426 Create a zone with "dnssec" set and see if a key was made.
428 name
= unique_zone_name()
429 name
, payload
, data
= self
.create_zone(dnssec
=True)
431 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
433 for k
in ('dnssec', ):
434 self
.assertIn(k
, data
)
436 self
.assertEquals(data
[k
], payload
[k
])
438 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
444 self
.assertEquals(r
.status_code
, 200)
445 self
.assertEquals(len(keys
), 1)
446 self
.assertEquals(keys
[0]['type'], 'Cryptokey')
447 self
.assertEquals(keys
[0]['active'], True)
448 self
.assertEquals(keys
[0]['keytype'], 'csk')
450 def test_create_zone_with_dnssec_disable_dnssec(self
):
452 Create a zone with "dnssec", then set "dnssec" to false and see if the
455 name
= unique_zone_name()
456 name
, payload
, data
= self
.create_zone(dnssec
=True)
458 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
459 data
=json
.dumps({'dnssec': False}))
460 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
464 self
.assertEquals(r
.status_code
, 200)
465 self
.assertEquals(zoneinfo
['dnssec'], False)
467 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
471 self
.assertEquals(r
.status_code
, 200)
472 self
.assertEquals(len(keys
), 0)
474 def test_create_zone_with_nsec3param(self
):
476 Create a zone with "nsec3param" set and see if the metadata was added.
478 name
= unique_zone_name()
479 nsec3param
= '1 0 500 aabbccddeeff'
480 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
)
482 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
484 for k
in ('dnssec', 'nsec3param'):
485 self
.assertIn(k
, data
)
487 self
.assertEquals(data
[k
], payload
[k
])
489 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3PARAM'))
495 self
.assertEquals(r
.status_code
, 200)
496 self
.assertEquals(len(data
['metadata']), 1)
497 self
.assertEquals(data
['kind'], 'NSEC3PARAM')
498 self
.assertEquals(data
['metadata'][0], nsec3param
)
500 def test_create_zone_with_nsec3narrow(self
):
502 Create a zone with "nsec3narrow" set and see if the metadata was added.
504 name
= unique_zone_name()
505 nsec3param
= '1 0 500 aabbccddeeff'
506 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
,
509 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
511 for k
in ('dnssec', 'nsec3param', 'nsec3narrow'):
512 self
.assertIn(k
, data
)
514 self
.assertEquals(data
[k
], payload
[k
])
516 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3NARROW'))
522 self
.assertEquals(r
.status_code
, 200)
523 self
.assertEquals(len(data
['metadata']), 1)
524 self
.assertEquals(data
['kind'], 'NSEC3NARROW')
525 self
.assertEquals(data
['metadata'][0], '1')
527 def test_create_zone_dnssec_serial(self
):
529 Create a zone set/unset "dnssec" and see if the serial was increased
532 name
= unique_zone_name()
533 name
, payload
, data
= self
.create_zone()
535 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
536 self
.assertEquals(soa_serial
[-2:], '01')
538 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
539 data
=json
.dumps({'dnssec': True}))
540 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
543 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
545 self
.assertEquals(r
.status_code
, 200)
546 self
.assertEquals(soa_serial
[-2:], '02')
548 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
549 data
=json
.dumps({'dnssec': False}))
550 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
553 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
555 self
.assertEquals(r
.status_code
, 200)
556 self
.assertEquals(soa_serial
[-2:], '03')
558 def test_zone_absolute_url(self
):
559 name
, payload
, data
= self
.create_zone()
560 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
563 self
.assertTrue(rdata
[0]['url'].startswith('/api/v'))
565 def test_create_zone_metadata(self
):
566 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
567 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
568 data
=json
.dumps(payload_metadata
))
570 self
.assertEquals(r
.status_code
, 201)
571 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
573 def test_create_zone_metadata_kind(self
):
574 payload_metadata
= {"metadata": ["127.0.0.2"]}
575 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
576 data
=json
.dumps(payload_metadata
))
578 self
.assertEquals(r
.status_code
, 200)
579 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
581 def test_create_protected_zone_metadata(self
):
582 # test whether it prevents modification of certain kinds
583 for k
in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
584 payload
= {"metadata": ["FOO", "BAR"]}
585 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k
),
586 data
=json
.dumps(payload
))
587 self
.assertEquals(r
.status_code
, 422)
589 def test_retrieve_zone_metadata(self
):
590 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
591 self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
592 data
=json
.dumps(payload_metadata
))
593 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"))
595 self
.assertEquals(r
.status_code
, 200)
596 self
.assertIn(payload_metadata
, rdata
)
598 def test_delete_zone_metadata(self
):
599 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
600 self
.assertEquals(r
.status_code
, 200)
601 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
603 self
.assertEquals(r
.status_code
, 200)
604 self
.assertEquals(rdata
["metadata"], [])
606 def test_create_external_zone_metadata(self
):
607 payload_metadata
= {"metadata": ["My very important message"]}
608 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
609 data
=json
.dumps(payload_metadata
))
610 self
.assertEquals(r
.status_code
, 200)
612 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
614 def test_create_metadata_in_non_existent_zone(self
):
615 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
616 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
617 data
=json
.dumps(payload_metadata
))
618 self
.assertEquals(r
.status_code
, 404)
619 # Note: errors should probably contain json (see #5988)
620 # self.assertIn('Could not find domain ', r.json()['error'])
622 def test_create_slave_zone(self
):
623 # Test that nameservers can be absent for slave zones.
624 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
625 for k
in ('name', 'masters', 'kind'):
626 self
.assertIn(k
, data
)
627 self
.assertEquals(data
[k
], payload
[k
])
628 print("payload:", payload
)
630 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
631 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
633 print("zonelist:", zonelist
)
634 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
635 # Also test that fetching the zone works.
636 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
638 print("zone (fetched):", data
)
639 for k
in ('name', 'masters', 'kind'):
640 self
.assertIn(k
, data
)
641 self
.assertEquals(data
[k
], payload
[k
])
642 self
.assertEqual(data
['serial'], 0)
643 self
.assertEqual(data
['rrsets'], [])
645 def test_delete_slave_zone(self
):
646 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
647 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
650 def test_retrieve_slave_zone(self
):
651 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
652 print("payload:", payload
)
654 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/axfr-retrieve"))
656 print("status for axfr-retrieve:", data
)
657 self
.assertEqual(data
['result'], u
'Added retrieval request for \'' + payload
['name'] +
658 '\' from master 127.0.0.2')
660 def test_notify_master_zone(self
):
661 name
, payload
, data
= self
.create_zone(kind
='Master')
662 print("payload:", payload
)
664 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/notify"))
666 print("status for notify:", data
)
667 self
.assertEqual(data
['result'], 'Notification queued')
669 def test_get_zone_with_symbols(self
):
670 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
671 name
= payload
['name']
672 zone_id
= (name
.replace('/', '=2F'))
673 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
))
675 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
676 self
.assertIn(k
, data
)
678 self
.assertEquals(data
[k
], payload
[k
])
680 def test_get_zone(self
):
681 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
683 example_com
= [domain
for domain
in domains
if domain
['name'] == u
'example.com.'][0]
684 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + example_com
['id']))
685 self
.assert_success_json(r
)
687 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
688 self
.assertIn(k
, data
)
689 self
.assertEquals(data
['name'], 'example.com.')
691 def test_import_zone_broken(self
):
693 payload
['zone'] = """
694 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
695 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
696 ;; WARNING: recursion requested but not available
698 ;; OPT PSEUDOSECTION:
699 ; EDNS: version: 0, flags:; udp: 1680
701 ;powerdns.com. IN SOA
704 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
705 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
706 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
707 powerdns-broken.com. 86400 IN A 82.94.213.34
708 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
709 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
710 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
712 payload
['name'] = 'powerdns-broken.com.'
713 payload
['kind'] = 'Master'
714 payload
['nameservers'] = []
715 r
= self
.session
.post(
716 self
.url("/api/v1/servers/localhost/zones"),
717 data
=json
.dumps(payload
),
718 headers
={'content-type': 'application/json'})
719 self
.assertEquals(r
.status_code
, 422)
721 def test_import_zone_axfr_outofzone(self
):
722 # Ensure we don't create out-of-zone records
723 name
= unique_zone_name()
725 payload
['zone'] = """
726 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
727 NAME 3600 IN NS powerdnssec2.ds9a.nl.
728 example.org. 3600 IN AAAA 2001:888:2000:1d::2
729 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
730 """.replace('NAME', name
)
731 payload
['name'] = name
732 payload
['kind'] = 'Master'
733 payload
['nameservers'] = []
734 r
= self
.session
.post(
735 self
.url("/api/v1/servers/localhost/zones"),
736 data
=json
.dumps(payload
),
737 headers
={'content-type': 'application/json'})
738 self
.assertEquals(r
.status_code
, 422)
739 self
.assertEqual(r
.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
741 def test_import_zone_axfr(self
):
743 payload
['zone'] = """
744 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
745 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
746 ;; WARNING: recursion requested but not available
748 ;; OPT PSEUDOSECTION:
749 ; EDNS: version: 0, flags:; udp: 1680
751 ;powerdns.com. IN SOA
754 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
755 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
756 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
757 powerdns.com. 86400 IN A 82.94.213.34
758 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
759 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
760 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
762 payload
['name'] = 'powerdns.com.'
763 payload
['kind'] = 'Master'
764 payload
['nameservers'] = []
765 payload
['soa_edit_api'] = '' # turn off so exact SOA comparison works.
766 r
= self
.session
.post(
767 self
.url("/api/v1/servers/localhost/zones"),
768 data
=json
.dumps(payload
),
769 headers
={'content-type': 'application/json'})
770 self
.assert_success_json(r
)
772 self
.assertIn('name', data
)
776 {'content': 'powerdnssec1.ds9a.nl.'},
777 {'content': 'powerdnssec2.ds9a.nl.'},
780 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
783 {'content': '0 xs.powerdns.com.'},
786 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
789 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
793 eq_zone_rrsets(data
['rrsets'], expected
)
795 # check content in DB is stored WITHOUT trailing dot.
796 dbrecs
= get_db_records(payload
['name'], 'NS')
797 dbrec
= next((dbrec
for dbrec
in dbrecs
if dbrec
['content'].startswith('powerdnssec1')))
798 self
.assertEqual(dbrec
['content'], 'powerdnssec1.ds9a.nl')
800 def test_import_zone_bind(self
):
802 payload
['zone'] = """
803 $TTL 86400 ; 24 hours could have been written as 24h or 1d
804 ; $TTL used for all RRs without explicit TTL value
806 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
813 IN NS ns1.example.org. ; in the domain
814 IN NS ns2.smokeyjoe.com. ; external to domain
815 IN MX 10 mail.another.com. ; external mail provider
816 ; server host definitions
817 ns1 IN A 192.168.0.1 ;name server definition
818 www IN A 192.168.0.2 ;web server definition
819 ftp IN CNAME www.example.org. ;ftp server definition
820 ; non server domain hosts
821 bill IN A 192.168.0.3
822 fred IN A 192.168.0.4
824 payload
['name'] = 'example.org.'
825 payload
['kind'] = 'Master'
826 payload
['nameservers'] = []
827 payload
['soa_edit_api'] = '' # turn off so exact SOA comparison works.
828 r
= self
.session
.post(
829 self
.url("/api/v1/servers/localhost/zones"),
830 data
=json
.dumps(payload
),
831 headers
={'content-type': 'application/json'})
832 self
.assert_success_json(r
)
834 self
.assertIn('name', data
)
838 {'content': 'ns1.example.org.'},
839 {'content': 'ns2.smokeyjoe.com.'},
842 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
845 {'content': '10 mail.another.com.'},
848 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
849 {'content': '192.168.0.2', 'name': 'www.example.org.'},
850 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
851 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
854 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
858 eq_zone_rrsets(data
['rrsets'], expected
)
860 def test_export_zone_json(self
):
861 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
863 r
= self
.session
.get(
864 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
865 headers
={'accept': 'application/json;q=0.9,*/*;q=0.8'}
867 self
.assert_success_json(r
)
869 self
.assertIn('zone', data
)
870 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
871 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
872 name
+ '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name
+
873 ' 0 10800 3600 604800 3600']
874 self
.assertEquals(data
['zone'].strip().split('\n'), expected_data
)
876 def test_export_zone_text(self
):
877 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
879 r
= self
.session
.get(
880 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
881 headers
={'accept': '*/*'}
883 data
= r
.text
.strip().split("\n")
884 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
885 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
886 name
+ '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name
+
887 ' 0 10800 3600 604800 3600']
888 self
.assertEquals(data
, expected_data
)
890 def test_update_zone(self
):
891 name
, payload
, zone
= self
.create_zone()
892 name
= payload
['name']
893 # update, set as Master and enable SOA-EDIT-API
896 'masters': ['192.0.2.1', '192.0.2.2'],
897 'soa_edit_api': 'EPOCH',
900 r
= self
.session
.put(
901 self
.url("/api/v1/servers/localhost/zones/" + name
),
902 data
=json
.dumps(payload
),
903 headers
={'content-type': 'application/json'})
904 self
.assert_success(r
)
905 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
906 for k
in payload
.keys():
907 self
.assertIn(k
, data
)
908 self
.assertEquals(data
[k
], payload
[k
])
909 # update, back to Native and empty(off)
915 r
= self
.session
.put(
916 self
.url("/api/v1/servers/localhost/zones/" + name
),
917 data
=json
.dumps(payload
),
918 headers
={'content-type': 'application/json'})
919 self
.assert_success(r
)
920 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
921 for k
in payload
.keys():
922 self
.assertIn(k
, data
)
923 self
.assertEquals(data
[k
], payload
[k
])
925 def test_zone_rr_update(self
):
926 name
, payload
, zone
= self
.create_zone()
927 # do a replace (= update)
929 'changetype': 'replace',
935 "content": "ns1.bar.com.",
939 "content": "ns2-disabled.bar.com.",
944 payload
= {'rrsets': [rrset
]}
945 r
= self
.session
.patch(
946 self
.url("/api/v1/servers/localhost/zones/" + name
),
947 data
=json
.dumps(payload
),
948 headers
={'content-type': 'application/json'})
949 self
.assert_success(r
)
950 # verify that (only) the new record is there
951 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
952 self
.assertEquals(get_rrset(data
, name
, 'NS')['records'], rrset
['records'])
954 def test_zone_rr_update_mx(self
):
955 # Important to test with MX records, as they have a priority field, which must end up in the content field.
956 name
, payload
, zone
= self
.create_zone()
957 # do a replace (= update)
959 'changetype': 'replace',
965 "content": "10 mail.example.org.",
970 payload
= {'rrsets': [rrset
]}
971 r
= self
.session
.patch(
972 self
.url("/api/v1/servers/localhost/zones/" + name
),
973 data
=json
.dumps(payload
),
974 headers
={'content-type': 'application/json'})
975 self
.assert_success(r
)
976 # verify that (only) the new record is there
977 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
978 self
.assertEquals(get_rrset(data
, name
, 'MX')['records'], rrset
['records'])
980 def test_zone_rr_update_opt(self
):
981 name
, payload
, zone
= self
.create_zone()
982 # do a replace (= update)
984 'changetype': 'replace',
995 payload
= {'rrsets': [rrset
]}
996 r
= self
.session
.patch(
997 self
.url("/api/v1/servers/localhost/zones/" + name
),
998 data
=json
.dumps(payload
),
999 headers
={'content-type': 'application/json'})
1000 self
.assertEquals(r
.status_code
, 422)
1001 self
.assertIn('OPT: invalid type given', r
.json()['error'])
1003 def test_zone_rr_update_multiple_rrsets(self
):
1004 name
, payload
, zone
= self
.create_zone()
1006 'changetype': 'replace',
1013 "content": "ns9999.example.com.",
1019 'changetype': 'replace',
1025 "content": "10 mx444.example.com.",
1030 payload
= {'rrsets': [rrset1
, rrset2
]}
1031 r
= self
.session
.patch(
1032 self
.url("/api/v1/servers/localhost/zones/" + name
),
1033 data
=json
.dumps(payload
),
1034 headers
={'content-type': 'application/json'})
1035 self
.assert_success(r
)
1036 # verify that all rrsets have been updated
1037 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1038 self
.assertEquals(get_rrset(data
, name
, 'NS')['records'], rrset1
['records'])
1039 self
.assertEquals(get_rrset(data
, name
, 'MX')['records'], rrset2
['records'])
1041 def test_zone_rr_update_duplicate_record(self
):
1042 name
, payload
, zone
= self
.create_zone()
1044 'changetype': 'replace',
1049 {"content": "ns9999.example.com.", "disabled": False},
1050 {"content": "ns9996.example.com.", "disabled": False},
1051 {"content": "ns9987.example.com.", "disabled": False},
1052 {"content": "ns9988.example.com.", "disabled": False},
1053 {"content": "ns9999.example.com.", "disabled": False},
1056 payload
= {'rrsets': [rrset
]}
1057 r
= self
.session
.patch(
1058 self
.url("/api/v1/servers/localhost/zones/" + name
),
1059 data
=json
.dumps(payload
),
1060 headers
={'content-type': 'application/json'})
1061 self
.assertEquals(r
.status_code
, 422)
1062 self
.assertIn('Duplicate record in RRset', r
.json()['error'])
1064 def test_zone_rr_update_duplicate_rrset(self
):
1065 name
, payload
, zone
= self
.create_zone()
1067 'changetype': 'replace',
1073 "content": "ns9999.example.com.",
1079 'changetype': 'replace',
1085 "content": "ns9998.example.com.",
1090 payload
= {'rrsets': [rrset1
, rrset2
]}
1091 r
= self
.session
.patch(
1092 self
.url("/api/v1/servers/localhost/zones/" + name
),
1093 data
=json
.dumps(payload
),
1094 headers
={'content-type': 'application/json'})
1095 self
.assertEquals(r
.status_code
, 422)
1096 self
.assertIn('Duplicate RRset', r
.json()['error'])
1098 def test_zone_rr_delete(self
):
1099 name
, payload
, zone
= self
.create_zone()
1100 # do a delete of all NS records (these are created with the zone)
1102 'changetype': 'delete',
1106 payload
= {'rrsets': [rrset
]}
1107 r
= self
.session
.patch(
1108 self
.url("/api/v1/servers/localhost/zones/" + name
),
1109 data
=json
.dumps(payload
),
1110 headers
={'content-type': 'application/json'})
1111 self
.assert_success(r
)
1112 # verify that the records are gone
1113 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1114 self
.assertIsNone(get_rrset(data
, name
, 'NS'))
1116 def test_zone_disable_reenable(self
):
1117 # This also tests that SOA-EDIT-API works.
1118 name
, payload
, zone
= self
.create_zone(soa_edit_api
='EPOCH')
1119 # disable zone by disabling SOA
1121 'changetype': 'replace',
1127 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1132 payload
= {'rrsets': [rrset
]}
1133 r
= self
.session
.patch(
1134 self
.url("/api/v1/servers/localhost/zones/" + name
),
1135 data
=json
.dumps(payload
),
1136 headers
={'content-type': 'application/json'})
1137 self
.assert_success(r
)
1138 # check SOA serial has been edited
1139 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1140 soa_serial1
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1141 self
.assertNotEquals(soa_serial1
, '1')
1142 # make sure domain is still in zone list (disabled SOA!)
1143 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
1145 self
.assertEquals(len([domain
for domain
in domains
if domain
['name'] == name
]), 1)
1146 # sleep 1sec to ensure the EPOCH value changes for the next request
1148 # verify that modifying it still works
1149 rrset
['records'][0]['disabled'] = False
1150 payload
= {'rrsets': [rrset
]}
1151 r
= self
.session
.patch(
1152 self
.url("/api/v1/servers/localhost/zones/" + name
),
1153 data
=json
.dumps(payload
),
1154 headers
={'content-type': 'application/json'})
1155 self
.assert_success(r
)
1156 # check SOA serial has been edited again
1157 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1158 soa_serial2
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1159 self
.assertNotEquals(soa_serial2
, '1')
1160 self
.assertNotEquals(soa_serial2
, soa_serial1
)
1162 def test_zone_rr_update_out_of_zone(self
):
1163 name
, payload
, zone
= self
.create_zone()
1164 # replace with qname mismatch
1166 'changetype': 'replace',
1167 'name': 'not-in-zone.',
1172 "content": "ns1.bar.com.",
1177 payload
= {'rrsets': [rrset
]}
1178 r
= self
.session
.patch(
1179 self
.url("/api/v1/servers/localhost/zones/" + name
),
1180 data
=json
.dumps(payload
),
1181 headers
={'content-type': 'application/json'})
1182 self
.assertEquals(r
.status_code
, 422)
1183 self
.assertIn('out of zone', r
.json()['error'])
1185 def test_zone_rr_update_restricted_chars(self
):
1186 name
, payload
, zone
= self
.create_zone()
1187 # replace with qname mismatch
1189 'changetype': 'replace',
1190 'name': 'test:' + name
,
1195 "content": "ns1.bar.com.",
1200 payload
= {'rrsets': [rrset
]}
1201 r
= self
.session
.patch(
1202 self
.url("/api/v1/servers/localhost/zones/" + name
),
1203 data
=json
.dumps(payload
),
1204 headers
={'content-type': 'application/json'})
1205 self
.assertEquals(r
.status_code
, 422)
1206 self
.assertIn('contains unsupported characters', r
.json()['error'])
1208 def test_rrset_unknown_type(self
):
1209 name
, payload
, zone
= self
.create_zone()
1211 'changetype': 'replace',
1217 "content": "4.3.2.1",
1222 payload
= {'rrsets': [rrset
]}
1223 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1224 headers
={'content-type': 'application/json'})
1225 self
.assertEquals(r
.status_code
, 422)
1226 self
.assertIn('unknown type', r
.json()['error'])
1228 def test_rrset_cname_and_other(self
):
1229 name
, payload
, zone
= self
.create_zone()
1231 'changetype': 'replace',
1237 "content": "example.org.",
1242 payload
= {'rrsets': [rrset
]}
1243 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1244 headers
={'content-type': 'application/json'})
1245 self
.assertEquals(r
.status_code
, 422)
1246 self
.assertIn('Conflicts with pre-existing non-CNAME RRset', r
.json()['error'])
1248 def test_rrset_other_and_cname(self
):
1249 name
, payload
, zone
= self
.create_zone()
1251 'changetype': 'replace',
1252 'name': 'sub.'+name
,
1257 "content": "example.org.",
1262 payload
= {'rrsets': [rrset
]}
1263 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1264 headers
={'content-type': 'application/json'})
1265 self
.assert_success(r
)
1267 'changetype': 'replace',
1268 'name': 'sub.'+name
,
1273 "content": "1.2.3.4",
1278 payload
= {'rrsets': [rrset
]}
1279 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1280 headers
={'content-type': 'application/json'})
1281 self
.assertEquals(r
.status_code
, 422)
1282 self
.assertIn('Conflicts with pre-existing CNAME RRset', r
.json()['error'])
1284 def test_create_zone_with_leading_space(self
):
1285 # Actual regression.
1286 name
, payload
, zone
= self
.create_zone()
1288 'changetype': 'replace',
1294 "content": " 4.3.2.1",
1299 payload
= {'rrsets': [rrset
]}
1300 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1301 headers
={'content-type': 'application/json'})
1302 self
.assertEquals(r
.status_code
, 422)
1303 self
.assertIn('Not in expected format', r
.json()['error'])
1305 def test_zone_rr_delete_out_of_zone(self
):
1306 name
, payload
, zone
= self
.create_zone()
1308 'changetype': 'delete',
1309 'name': 'not-in-zone.',
1312 payload
= {'rrsets': [rrset
]}
1313 r
= self
.session
.patch(
1314 self
.url("/api/v1/servers/localhost/zones/" + name
),
1315 data
=json
.dumps(payload
),
1316 headers
={'content-type': 'application/json'})
1318 self
.assert_success(r
) # succeed so users can fix their wrong, old data
1320 def test_zone_delete(self
):
1321 name
, payload
, zone
= self
.create_zone()
1322 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
1323 self
.assertEquals(r
.status_code
, 204)
1324 self
.assertNotIn('Content-Type', r
.headers
)
1326 def test_zone_comment_create(self
):
1327 name
, payload
, zone
= self
.create_zone()
1329 'changetype': 'replace',
1336 'content': 'blah blah',
1340 'content': 'blah blah bleh',
1344 payload
= {'rrsets': [rrset
]}
1345 r
= self
.session
.patch(
1346 self
.url("/api/v1/servers/localhost/zones/" + name
),
1347 data
=json
.dumps(payload
),
1348 headers
={'content-type': 'application/json'})
1349 self
.assert_success(r
)
1350 # make sure the comments have been set, and that the NS
1351 # records are still present
1352 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1353 serverset
= get_rrset(data
, name
, 'NS')
1355 self
.assertNotEquals(serverset
['records'], [])
1356 self
.assertNotEquals(serverset
['comments'], [])
1357 # verify that modified_at has been set by pdns
1358 self
.assertNotEquals([c
for c
in serverset
['comments']][0]['modified_at'], 0)
1359 # verify that TTL is correct (regression test)
1360 self
.assertEquals(serverset
['ttl'], 3600)
1362 def test_zone_comment_delete(self
):
1363 # Test: Delete ONLY comments.
1364 name
, payload
, zone
= self
.create_zone()
1366 'changetype': 'replace',
1371 payload
= {'rrsets': [rrset
]}
1372 r
= self
.session
.patch(
1373 self
.url("/api/v1/servers/localhost/zones/" + name
),
1374 data
=json
.dumps(payload
),
1375 headers
={'content-type': 'application/json'})
1376 self
.assert_success(r
)
1377 # make sure the NS records are still present
1378 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1379 serverset
= get_rrset(data
, name
, 'NS')
1381 self
.assertNotEquals(serverset
['records'], [])
1382 self
.assertEquals(serverset
['comments'], [])
1384 def test_zone_comment_stay_intact(self
):
1385 # Test if comments on an rrset stay intact if the rrset is replaced
1386 name
, payload
, zone
= self
.create_zone()
1389 'changetype': 'replace',
1395 'content': 'oh hi there',
1400 payload
= {'rrsets': [rrset
]}
1401 r
= self
.session
.patch(
1402 self
.url("/api/v1/servers/localhost/zones/" + name
),
1403 data
=json
.dumps(payload
),
1404 headers
={'content-type': 'application/json'})
1405 self
.assert_success(r
)
1406 # replace rrset records
1408 'changetype': 'replace',
1414 "content": "ns1.bar.com.",
1419 payload2
= {'rrsets': [rrset2
]}
1420 r
= self
.session
.patch(
1421 self
.url("/api/v1/servers/localhost/zones/" + name
),
1422 data
=json
.dumps(payload2
),
1423 headers
={'content-type': 'application/json'})
1424 self
.assert_success(r
)
1425 # make sure the comments still exist
1426 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1427 serverset
= get_rrset(data
, name
, 'NS')
1429 self
.assertEquals(serverset
['records'], rrset2
['records'])
1430 self
.assertEquals(serverset
['comments'], rrset
['comments'])
1432 def test_zone_auto_ptr_ipv4_create(self
):
1433 revzone
= '4.2.192.in-addr.arpa.'
1434 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1435 name
= unique_zone_name()
1441 "content": "192.2.4.44",
1446 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
1447 del rrset
['records'][0]['set-ptr']
1448 self
.assertEquals(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
1449 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1450 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1452 self
.assertEquals(revsets
, [{
1453 u
'name': u
'44.4.2.192.in-addr.arpa.',
1462 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1463 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1465 def test_zone_auto_ptr_ipv4_update(self
):
1466 revzone
= '0.2.192.in-addr.arpa.'
1467 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1468 name
, payload
, zone
= self
.create_zone()
1470 'changetype': 'replace',
1476 "content": '192.2.0.2',
1482 payload
= {'rrsets': [rrset
]}
1483 r
= self
.session
.patch(
1484 self
.url("/api/v1/servers/localhost/zones/" + name
),
1485 data
=json
.dumps(payload
),
1486 headers
={'content-type': 'application/json'})
1487 self
.assert_success(r
)
1488 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1489 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1491 self
.assertEquals(revsets
, [{
1492 u
'name': u
'2.0.2.192.in-addr.arpa.',
1501 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1502 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1504 def test_zone_auto_ptr_ipv6_update(self
):
1506 revzone
= '8.b.d.0.1.0.0.2.ip6.arpa.'
1507 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1508 name
, payload
, zone
= self
.create_zone()
1510 'changetype': 'replace',
1516 "content": '2001:DB8::bb:aa',
1522 payload
= {'rrsets': [rrset
]}
1523 r
= self
.session
.patch(
1524 self
.url("/api/v1/servers/localhost/zones/" + name
),
1525 data
=json
.dumps(payload
),
1526 headers
={'content-type': 'application/json'})
1527 self
.assert_success(r
)
1528 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1529 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1531 self
.assertEquals(revsets
, [{
1532 u
'name': u
'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1541 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1542 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1544 def test_search_rr_exact_zone(self
):
1545 name
= unique_zone_name()
1546 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
1547 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.')))
1548 self
.assert_success_json(r
)
1550 self
.assertEquals(r
.json(), [
1551 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
1552 {u
'content': u
'ns1.example.com.',
1553 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1554 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1555 {u
'content': u
'ns2.example.com.',
1556 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1557 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1558 {u
'content': u
'a.misconfigured.powerdns.server. hostmaster.'+name
+' 22 10800 3600 604800 3600',
1559 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1560 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
1563 def test_search_rr_substring(self
):
1564 name
= unique_zone_name()
1566 self
.create_zone(name
=name
)
1567 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
1568 self
.assert_success_json(r
)
1570 # should return zone, SOA, ns1, ns2
1571 self
.assertEquals(len(r
.json()), 4)
1573 def test_search_rr_case_insensitive(self
):
1574 name
= unique_zone_name()+'testsuffix.'
1575 self
.create_zone(name
=name
)
1576 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1577 self
.assert_success_json(r
)
1579 # should return zone, SOA, ns1, ns2
1580 self
.assertEquals(len(r
.json()), 4)
1582 def test_search_after_rectify_with_ent(self
):
1583 name
= unique_zone_name()
1584 search
= name
.split('.')[0]
1586 "name": 'sub.sub.' + name
,
1590 "content": "4.3.2.1",
1594 self
.create_zone(name
=name
, rrsets
=[rrset
])
1595 pdnsutil_rectify(name
)
1596 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
1597 self
.assert_success_json(r
)
1599 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1600 self
.assertEquals(len(r
.json()), 5)
1602 def test_cname_at_ent_place(self
):
1603 name
, payload
, zone
= self
.create_zone(api_rectify
=True)
1605 'changetype': 'replace',
1606 'name': 'sub2.sub1.' + name
,
1610 'content': "4.3.2.1",
1614 payload
= {'rrsets': [rrset
]}
1615 r
= self
.session
.patch(
1616 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
1617 data
=json
.dumps(payload
),
1618 headers
={'content-type': 'application/json'})
1619 self
.assertEquals(r
.status_code
, 204)
1621 'changetype': 'replace',
1622 'name': 'sub1.' + name
,
1626 'content': "www.example.org.",
1630 payload
= {'rrsets': [rrset
]}
1631 r
= self
.session
.patch(
1632 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
1633 data
=json
.dumps(payload
),
1634 headers
={'content-type': 'application/json'})
1635 self
.assertEquals(r
.status_code
, 204)
1637 def test_rrset_parameter_post_false(self
):
1638 name
= unique_zone_name()
1642 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1644 r
= self
.session
.post(
1645 self
.url("/api/v1/servers/localhost/zones?rrsets=false"),
1646 data
=json
.dumps(payload
),
1647 headers
={'content-type': 'application/json'})
1649 self
.assert_success_json(r
)
1650 self
.assertEquals(r
.status_code
, 201)
1651 self
.assertEquals(r
.json().get('rrsets'), None)
1653 def test_rrset_false_parameter(self
):
1654 name
= unique_zone_name()
1655 self
.create_zone(name
=name
, kind
='Native')
1656 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=false"))
1657 self
.assert_success_json(r
)
1659 self
.assertEquals(r
.json().get('rrsets'), None)
1661 def test_rrset_true_parameter(self
):
1662 name
= unique_zone_name()
1663 self
.create_zone(name
=name
, kind
='Native')
1664 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=true"))
1665 self
.assert_success_json(r
)
1667 self
.assertEquals(len(r
.json().get('rrsets')), 2)
1669 def test_wrong_rrset_parameter(self
):
1670 name
= unique_zone_name()
1671 self
.create_zone(name
=name
, kind
='Native')
1672 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=foobar"))
1673 self
.assertEquals(r
.status_code
, 422)
1674 self
.assertIn("'rrsets' request parameter value 'foobar' is not supported", r
.json()['error'])
1677 @unittest.skipIf(not is_auth(), "Not applicable")
1678 class AuthRootZone(ApiTestCase
, AuthZonesHelperMixin
):
1681 super(AuthRootZone
, self
).setUp()
1682 # zone name is not unique, so delete the zone before each individual test.
1683 self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/=2E"))
1685 def test_create_zone(self
):
1686 name
, payload
, data
= self
.create_zone(name
='.', serial
=22, soa_edit_api
='')
1687 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
1688 self
.assertIn(k
, data
)
1690 self
.assertEquals(data
[k
], payload
[k
])
1691 # validate generated SOA
1692 rec
= get_first_rec(data
, '.', 'SOA')
1695 "a.misconfigured.powerdns.server. hostmaster. " + str(payload
['serial']) +
1696 " 10800 3600 604800 3600"
1698 # Regression test: verify zone list works
1699 zonelist
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones")).json()
1700 print("zonelist:", zonelist
)
1701 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
1702 # Also test that fetching the zone works.
1703 print("id:", data
['id'])
1704 self
.assertEquals(data
['id'], '=2E')
1705 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id'])).json()
1706 print("zone (fetched):", data
)
1707 for k
in ('name', 'kind'):
1708 self
.assertIn(k
, data
)
1709 self
.assertEquals(data
[k
], payload
[k
])
1710 self
.assertEqual(data
['rrsets'][0]['name'], '.')
1712 def test_update_zone(self
):
1713 name
, payload
, zone
= self
.create_zone(name
='.')
1715 # update, set as Master and enable SOA-EDIT-API
1718 'masters': ['192.0.2.1', '192.0.2.2'],
1719 'soa_edit_api': 'EPOCH',
1722 r
= self
.session
.put(
1723 self
.url("/api/v1/servers/localhost/zones/" + zone_id
),
1724 data
=json
.dumps(payload
),
1725 headers
={'content-type': 'application/json'})
1726 self
.assert_success(r
)
1727 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
)).json()
1728 for k
in payload
.keys():
1729 self
.assertIn(k
, data
)
1730 self
.assertEquals(data
[k
], payload
[k
])
1731 # update, back to Native and empty(off)
1737 r
= self
.session
.put(
1738 self
.url("/api/v1/servers/localhost/zones/" + zone_id
),
1739 data
=json
.dumps(payload
),
1740 headers
={'content-type': 'application/json'})
1741 self
.assert_success(r
)
1742 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
)).json()
1743 for k
in payload
.keys():
1744 self
.assertIn(k
, data
)
1745 self
.assertEquals(data
[k
], payload
[k
])
1748 @unittest.skipIf(not is_recursor(), "Not applicable")
1749 class RecursorZones(ApiTestCase
):
1751 def create_zone(self
, name
=None, kind
=None, rd
=False, servers
=None):
1753 name
= unique_zone_name()
1760 'recursion_desired': rd
1762 r
= self
.session
.post(
1763 self
.url("/api/v1/servers/localhost/zones"),
1764 data
=json
.dumps(payload
),
1765 headers
={'content-type': 'application/json'})
1766 self
.assert_success_json(r
)
1767 return payload
, r
.json()
1769 def test_create_auth_zone(self
):
1770 payload
, data
= self
.create_zone(kind
='Native')
1771 for k
in payload
.keys():
1772 self
.assertEquals(data
[k
], payload
[k
])
1774 def test_create_zone_no_name(self
):
1778 'servers': ['8.8.8.8'],
1779 'recursion_desired': False,
1782 r
= self
.session
.post(
1783 self
.url("/api/v1/servers/localhost/zones"),
1784 data
=json
.dumps(payload
),
1785 headers
={'content-type': 'application/json'})
1786 self
.assertEquals(r
.status_code
, 422)
1787 self
.assertIn('is not canonical', r
.json()['error'])
1789 def test_create_forwarded_zone(self
):
1790 payload
, data
= self
.create_zone(kind
='Forwarded', rd
=False, servers
=['8.8.8.8'])
1791 # return values are normalized
1792 payload
['servers'][0] += ':53'
1793 for k
in payload
.keys():
1794 self
.assertEquals(data
[k
], payload
[k
])
1796 def test_create_forwarded_rd_zone(self
):
1797 payload
, data
= self
.create_zone(name
='google.com.', kind
='Forwarded', rd
=True, servers
=['8.8.8.8'])
1798 # return values are normalized
1799 payload
['servers'][0] += ':53'
1800 for k
in payload
.keys():
1801 self
.assertEquals(data
[k
], payload
[k
])
1803 def test_create_auth_zone_with_symbols(self
):
1804 payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name(), kind
='Native')
1805 expected_id
= (payload
['name'].replace('/', '=2F'))
1806 for k
in payload
.keys():
1807 self
.assertEquals(data
[k
], payload
[k
])
1808 self
.assertEquals(data
['id'], expected_id
)
1810 def test_rename_auth_zone(self
):
1811 payload
, data
= self
.create_zone(kind
='Native')
1812 name
= payload
['name']
1815 'name': 'renamed-'+name
,
1817 'recursion_desired': False
1819 r
= self
.session
.put(
1820 self
.url("/api/v1/servers/localhost/zones/" + name
),
1821 data
=json
.dumps(payload
),
1822 headers
={'content-type': 'application/json'})
1823 self
.assert_success(r
)
1824 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + payload
['name'])).json()
1825 for k
in payload
.keys():
1826 self
.assertEquals(data
[k
], payload
[k
])
1828 def test_zone_delete(self
):
1829 payload
, zone
= self
.create_zone(kind
='Native')
1830 name
= payload
['name']
1831 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
1832 self
.assertEquals(r
.status_code
, 204)
1833 self
.assertNotIn('Content-Type', r
.headers
)
1835 def test_search_rr_exact_zone(self
):
1836 name
= unique_zone_name()
1837 self
.create_zone(name
=name
, kind
='Native')
1838 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
))
1839 self
.assert_success_json(r
)
1841 self
.assertEquals(r
.json(), [{u
'type': u
'zone', u
'name': name
, u
'zone_id': name
}])
1843 def test_search_rr_substring(self
):
1844 name
= 'search-rr-zone.name.'
1845 self
.create_zone(name
=name
, kind
='Native')
1846 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
1847 self
.assert_success_json(r
)
1849 # should return zone, SOA
1850 self
.assertEquals(len(r
.json()), 2)
1852 @unittest.skipIf(not is_auth(), "Not applicable")
1853 class AuthZoneKeys(ApiTestCase
, AuthZonesHelperMixin
):
1855 def test_get_keys(self
):
1856 r
= self
.session
.get(
1857 self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
1858 self
.assert_success_json(r
)
1860 self
.assertGreater(len(keys
), 0)
1862 key0
= deepcopy(keys
[0])
1866 u
'algorithm': u
'ECDSAP256SHA256',
1869 u
'type': u
'Cryptokey',
1873 self
.assertEquals(key0
, expected
)
1875 keydata
= keys
[0]['dnskey'].split()
1876 self
.assertEqual(len(keydata
), 4)