]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
1 from __future__
import print_function
5 from copy
import deepcopy
6 from pprint
import pprint
7 from test_helper
import ApiTestCase
, unique_zone_name
, is_auth
, is_recursor
, get_db_records
, pdnsutil_rectify
10 def get_rrset(data
, qname
, qtype
):
11 for rrset
in data
['rrsets']:
12 if rrset
['name'] == qname
and rrset
['type'] == qtype
:
17 def get_first_rec(data
, qname
, qtype
):
18 rrset
= get_rrset(data
, qname
, qtype
)
20 return rrset
['records'][0]
24 def eq_zone_rrsets(rrsets
, expected
):
27 for type_
, expected_records
in expected
.items():
29 data_got
[type_
] = set()
30 data_expected
[type_
] = set()
31 uses_name
= any(['name' in expected_record
for expected_record
in expected_records
])
32 # minify + convert received data
33 for rrset
in [rrset
for rrset
in rrsets
if rrset
['type'] == type_
]:
35 for r
in rrset
['records']:
36 data_got
[type_
].add((rrset
['name'] if uses_name
else '@', rrset
['type'], r
['content']))
37 # minify expected data
38 for r
in expected_records
:
39 data_expected
[type_
].add((r
['name'] if uses_name
else '@', type_
, r
['content']))
41 print("eq_zone_rrsets: got:")
43 print("eq_zone_rrsets: expected:")
46 assert data_got
== data_expected
, "%r != %r" % (data_got
, data_expected
)
49 class Zones(ApiTestCase
):
51 def test_list_zones(self
):
52 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
53 self
.assert_success_json(r
)
55 example_com
= [domain
for domain
in domains
if domain
['name'] in ('example.com', 'example.com.')]
56 self
.assertEquals(len(example_com
), 1)
57 example_com
= example_com
[0]
59 required_fields
= ['id', 'url', 'name', 'kind']
61 required_fields
= required_fields
+ ['masters', 'last_check', 'notified_serial', 'serial', 'account']
62 self
.assertNotEquals(example_com
['serial'], 0)
64 required_fields
= required_fields
+ ['recursion_desired', 'servers']
65 for field
in required_fields
:
66 self
.assertIn(field
, example_com
)
69 class AuthZonesHelperMixin(object):
70 def create_zone(self
, name
=None, **kwargs
):
72 name
= unique_zone_name()
76 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
78 for k
, v
in kwargs
.items():
83 print("sending", payload
)
84 r
= self
.session
.post(
85 self
.url("/api/v1/servers/localhost/zones"),
86 data
=json
.dumps(payload
),
87 headers
={'content-type': 'application/json'})
88 self
.assert_success_json(r
)
89 self
.assertEquals(r
.status_code
, 201)
92 return name
, payload
, reply
95 @unittest.skipIf(not is_auth(), "Not applicable")
96 class AuthZones(ApiTestCase
, AuthZonesHelperMixin
):
98 def test_create_zone(self
):
99 # soa_edit_api has a default, override with empty for this test
100 name
, payload
, data
= self
.create_zone(serial
=22, soa_edit_api
='')
101 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
102 self
.assertIn(k
, data
)
104 self
.assertEquals(data
[k
], payload
[k
])
105 # validate generated SOA
106 expected_soa
= "a.misconfigured.powerdns.server. hostmaster." + name
+ " " + \
107 str(payload
['serial']) + " 10800 3600 604800 3600"
109 get_first_rec(data
, name
, 'SOA')['content'],
112 # Because we had confusion about dots, check that the DB is without dots.
113 dbrecs
= get_db_records(name
, 'SOA')
114 self
.assertEqual(dbrecs
[0]['content'], expected_soa
.replace('. ', ' '))
116 def test_create_zone_with_soa_edit_api(self
):
117 # soa_edit_api wins over serial
118 name
, payload
, data
= self
.create_zone(soa_edit_api
='EPOCH', serial
=10)
119 for k
in ('soa_edit_api', ):
120 self
.assertIn(k
, data
)
122 self
.assertEquals(data
[k
], payload
[k
])
123 # generated EPOCH serial surely is > fixed serial we passed in
125 self
.assertGreater(data
['serial'], payload
['serial'])
126 soa_serial
= int(get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2])
127 self
.assertGreater(soa_serial
, payload
['serial'])
128 self
.assertEquals(soa_serial
, data
['serial'])
130 def test_create_zone_with_account(self
):
131 # soa_edit_api wins over serial
132 name
, payload
, data
= self
.create_zone(account
='anaccount', serial
=10)
134 for k
in ('account', ):
135 self
.assertIn(k
, data
)
137 self
.assertEquals(data
[k
], payload
[k
])
139 def test_create_zone_default_soa_edit_api(self
):
140 name
, payload
, data
= self
.create_zone()
142 self
.assertEquals(data
['soa_edit_api'], 'DEFAULT')
144 def test_create_zone_exists(self
):
145 name
, payload
, data
= self
.create_zone()
152 r
= self
.session
.post(
153 self
.url("/api/v1/servers/localhost/zones"),
154 data
=json
.dumps(payload
),
155 headers
={'content-type': 'application/json'})
156 self
.assertEquals(r
.status_code
, 409) # Conflict - already exists
158 def test_create_zone_with_soa_edit(self
):
159 name
, payload
, data
= self
.create_zone(soa_edit
='INCEPTION-INCREMENT', soa_edit_api
='SOA-EDIT-INCREASE')
161 self
.assertEquals(data
['soa_edit'], 'INCEPTION-INCREMENT')
162 self
.assertEquals(data
['soa_edit_api'], 'SOA-EDIT-INCREASE')
163 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
164 # These particular settings lead to the first serial set to YYYYMMDD01.
165 self
.assertEquals(soa_serial
[-2:], '01')
167 'changetype': 'replace',
173 "content": "127.0.0.1",
178 payload
= {'rrsets': [rrset
]}
180 self
.url("/api/v1/servers/localhost/zones/" + data
['id']),
181 data
=json
.dumps(payload
),
182 headers
={'content-type': 'application/json'})
183 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
185 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
186 self
.assertEquals(soa_serial
[-2:], '02')
188 def test_create_zone_with_records(self
):
189 name
= unique_zone_name()
195 "content": "4.3.2.1",
199 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
200 # check our record has appeared
201 self
.assertEquals(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
203 def test_create_zone_with_wildcard_records(self
):
204 name
= unique_zone_name()
210 "content": "4.3.2.1",
214 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
215 # check our record has appeared
216 self
.assertEquals(get_rrset(data
, rrset
['name'], 'A')['records'], rrset
['records'])
218 def test_create_zone_with_comments(self
):
219 name
= unique_zone_name()
223 "type": "soa", # test uppercasing of type, too.
226 "content": "blah blah",
227 "modified_at": 11112,
235 "content": "2001:DB8::1",
239 "account": "test AAAA",
240 "content": "blah blah AAAA",
241 "modified_at": 11112,
249 "content": "\"test TXT\"",
258 "content": "192.0.2.1",
263 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=rrsets
)
264 # NS records have been created
265 self
.assertEquals(len(data
['rrsets']), len(rrsets
) + 1)
266 # check our comment has appeared
267 self
.assertEquals(get_rrset(data
, name
, 'SOA')['comments'], rrsets
[0]['comments'])
268 self
.assertEquals(get_rrset(data
, name
, 'A')['comments'], [])
269 self
.assertEquals(get_rrset(data
, name
, 'TXT')['comments'], [])
270 self
.assertEquals(get_rrset(data
, name
, 'AAAA')['comments'], rrsets
[1]['comments'])
272 def test_create_zone_uncanonical_nameservers(self
):
273 name
= unique_zone_name()
277 'nameservers': ['uncanon.example.com']
280 r
= self
.session
.post(
281 self
.url("/api/v1/servers/localhost/zones"),
282 data
=json
.dumps(payload
),
283 headers
={'content-type': 'application/json'})
284 self
.assertEquals(r
.status_code
, 422)
285 self
.assertIn('Nameserver is not canonical', r
.json()['error'])
287 def test_create_auth_zone_no_name(self
):
288 name
= unique_zone_name()
294 r
= self
.session
.post(
295 self
.url("/api/v1/servers/localhost/zones"),
296 data
=json
.dumps(payload
),
297 headers
={'content-type': 'application/json'})
298 self
.assertEquals(r
.status_code
, 422)
299 self
.assertIn('is not canonical', r
.json()['error'])
301 def test_create_zone_with_custom_soa(self
):
302 name
= unique_zone_name()
303 content
= u
"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
306 "type": "soa", # test uppercasing of type, too.
313 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
], soa_edit_api
='')
314 self
.assertEquals(get_rrset(data
, name
, 'SOA')['records'], rrset
['records'])
315 dbrecs
= get_db_records(name
, 'SOA')
316 self
.assertEqual(dbrecs
[0]['content'], content
.replace('. ', ' '))
318 def test_create_zone_double_dot(self
):
319 name
= 'test..' + unique_zone_name()
323 'nameservers': ['ns1.example.com.']
326 r
= self
.session
.post(
327 self
.url("/api/v1/servers/localhost/zones"),
328 data
=json
.dumps(payload
),
329 headers
={'content-type': 'application/json'})
330 self
.assertEquals(r
.status_code
, 422)
331 self
.assertIn('Unable to parse DNS Name', r
.json()['error'])
333 def test_create_zone_restricted_chars(self
):
334 name
= 'test:' + unique_zone_name() # : isn't good as a name.
338 'nameservers': ['ns1.example.com']
341 r
= self
.session
.post(
342 self
.url("/api/v1/servers/localhost/zones"),
343 data
=json
.dumps(payload
),
344 headers
={'content-type': 'application/json'})
345 self
.assertEquals(r
.status_code
, 422)
346 self
.assertIn('contains unsupported characters', r
.json()['error'])
348 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self
):
349 name
= unique_zone_name()
355 "content": "ns2.example.com.",
362 'nameservers': ['ns1.example.com.'],
366 r
= self
.session
.post(
367 self
.url("/api/v1/servers/localhost/zones"),
368 data
=json
.dumps(payload
),
369 headers
={'content-type': 'application/json'})
370 self
.assertEquals(r
.status_code
, 422)
371 self
.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r
.json()['error'])
373 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self
):
374 name
= unique_zone_name()
376 "name": 'subzone.'+name
,
380 "content": "ns2.example.com.",
387 'nameservers': ['ns1.example.com.'],
391 r
= self
.session
.post(
392 self
.url("/api/v1/servers/localhost/zones"),
393 data
=json
.dumps(payload
),
394 headers
={'content-type': 'application/json'})
395 self
.assert_success_json(r
)
397 def test_create_zone_with_symbols(self
):
398 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
399 name
= payload
['name']
400 expected_id
= name
.replace('/', '=2F')
401 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
402 self
.assertIn(k
, data
)
404 self
.assertEquals(data
[k
], payload
[k
])
405 self
.assertEquals(data
['id'], expected_id
)
406 dbrecs
= get_db_records(name
, 'SOA')
407 self
.assertEqual(dbrecs
[0]['name'], name
.rstrip('.'))
409 def test_create_zone_with_nameservers_non_string(self
):
410 # ensure we don't crash
411 name
= unique_zone_name()
415 'nameservers': [{'a': 'ns1.example.com'}] # invalid
418 r
= self
.session
.post(
419 self
.url("/api/v1/servers/localhost/zones"),
420 data
=json
.dumps(payload
),
421 headers
={'content-type': 'application/json'})
422 self
.assertEquals(r
.status_code
, 422)
424 def test_create_zone_with_dnssec(self
):
426 Create a zone with "dnssec" set and see if a key was made.
428 name
= unique_zone_name()
429 name
, payload
, data
= self
.create_zone(dnssec
=True)
431 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
433 for k
in ('dnssec', ):
434 self
.assertIn(k
, data
)
436 self
.assertEquals(data
[k
], payload
[k
])
438 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
444 self
.assertEquals(r
.status_code
, 200)
445 self
.assertEquals(len(keys
), 1)
446 self
.assertEquals(keys
[0]['type'], 'Cryptokey')
447 self
.assertEquals(keys
[0]['active'], True)
448 self
.assertEquals(keys
[0]['keytype'], 'csk')
450 def test_create_zone_with_dnssec_disable_dnssec(self
):
452 Create a zone with "dnssec", then set "dnssec" to false and see if the
455 name
= unique_zone_name()
456 name
, payload
, data
= self
.create_zone(dnssec
=True)
458 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
459 data
=json
.dumps({'dnssec': False}))
460 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
464 self
.assertEquals(r
.status_code
, 200)
465 self
.assertEquals(zoneinfo
['dnssec'], False)
467 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
471 self
.assertEquals(r
.status_code
, 200)
472 self
.assertEquals(len(keys
), 0)
474 def test_create_zone_with_nsec3param(self
):
476 Create a zone with "nsec3param" set and see if the metadata was added.
478 name
= unique_zone_name()
479 nsec3param
= '1 0 500 aabbccddeeff'
480 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
)
482 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
484 for k
in ('dnssec', 'nsec3param'):
485 self
.assertIn(k
, data
)
487 self
.assertEquals(data
[k
], payload
[k
])
489 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3PARAM'))
495 self
.assertEquals(r
.status_code
, 200)
496 self
.assertEquals(len(data
['metadata']), 1)
497 self
.assertEquals(data
['kind'], 'NSEC3PARAM')
498 self
.assertEquals(data
['metadata'][0], nsec3param
)
500 def test_create_zone_with_nsec3narrow(self
):
502 Create a zone with "nsec3narrow" set and see if the metadata was added.
504 name
= unique_zone_name()
505 nsec3param
= '1 0 500 aabbccddeeff'
506 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
,
509 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
511 for k
in ('dnssec', 'nsec3param', 'nsec3narrow'):
512 self
.assertIn(k
, data
)
514 self
.assertEquals(data
[k
], payload
[k
])
516 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3NARROW'))
522 self
.assertEquals(r
.status_code
, 200)
523 self
.assertEquals(len(data
['metadata']), 1)
524 self
.assertEquals(data
['kind'], 'NSEC3NARROW')
525 self
.assertEquals(data
['metadata'][0], '1')
527 def test_create_zone_dnssec_serial(self
):
529 Create a zone set/unset "dnssec" and see if the serial was increased
532 name
= unique_zone_name()
533 name
, payload
, data
= self
.create_zone()
535 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
536 self
.assertEquals(soa_serial
[-2:], '01')
538 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
539 data
=json
.dumps({'dnssec': True}))
540 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
543 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
545 self
.assertEquals(r
.status_code
, 200)
546 self
.assertEquals(soa_serial
[-2:], '02')
548 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
549 data
=json
.dumps({'dnssec': False}))
550 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
553 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
555 self
.assertEquals(r
.status_code
, 200)
556 self
.assertEquals(soa_serial
[-2:], '03')
558 def test_zone_absolute_url(self
):
559 name
, payload
, data
= self
.create_zone()
560 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
563 self
.assertTrue(rdata
[0]['url'].startswith('/api/v'))
565 def test_create_zone_metadata(self
):
566 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
567 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
568 data
=json
.dumps(payload_metadata
))
570 self
.assertEquals(r
.status_code
, 201)
571 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
573 def test_create_zone_metadata_kind(self
):
574 payload_metadata
= {"metadata": ["127.0.0.2"]}
575 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
576 data
=json
.dumps(payload_metadata
))
578 self
.assertEquals(r
.status_code
, 200)
579 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
581 def test_create_protected_zone_metadata(self
):
582 # test whether it prevents modification of certain kinds
583 for k
in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
584 payload
= {"metadata": ["FOO", "BAR"]}
585 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k
),
586 data
=json
.dumps(payload
))
587 self
.assertEquals(r
.status_code
, 422)
589 def test_retrieve_zone_metadata(self
):
590 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
591 self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
592 data
=json
.dumps(payload_metadata
))
593 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"))
595 self
.assertEquals(r
.status_code
, 200)
596 self
.assertIn(payload_metadata
, rdata
)
598 def test_delete_zone_metadata(self
):
599 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
600 self
.assertEquals(r
.status_code
, 200)
601 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
603 self
.assertEquals(r
.status_code
, 200)
604 self
.assertEquals(rdata
["metadata"], [])
606 def test_create_external_zone_metadata(self
):
607 payload_metadata
= {"metadata": ["My very important message"]}
608 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
609 data
=json
.dumps(payload_metadata
))
610 self
.assertEquals(r
.status_code
, 200)
612 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
614 def test_create_metadata_in_non_existent_zone(self
):
615 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
616 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
617 data
=json
.dumps(payload_metadata
))
618 self
.assertEquals(r
.status_code
, 404)
619 # Note: errors should probably contain json (see #5988)
620 # self.assertIn('Could not find domain ', r.json()['error'])
622 def test_create_slave_zone(self
):
623 # Test that nameservers can be absent for slave zones.
624 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
625 for k
in ('name', 'masters', 'kind'):
626 self
.assertIn(k
, data
)
627 self
.assertEquals(data
[k
], payload
[k
])
628 print("payload:", payload
)
630 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
631 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
633 print("zonelist:", zonelist
)
634 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
635 # Also test that fetching the zone works.
636 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
638 print("zone (fetched):", data
)
639 for k
in ('name', 'masters', 'kind'):
640 self
.assertIn(k
, data
)
641 self
.assertEquals(data
[k
], payload
[k
])
642 self
.assertEqual(data
['serial'], 0)
643 self
.assertEqual(data
['rrsets'], [])
645 def test_find_zone_by_name(self
):
646 name
= 'foo/' + unique_zone_name()
647 name
, payload
, data
= self
.create_zone(name
=name
)
648 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones?zone=" + name
))
651 self
.assertEquals(data
[0]['name'], name
)
653 def test_delete_slave_zone(self
):
654 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
655 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
658 def test_retrieve_slave_zone(self
):
659 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
660 print("payload:", payload
)
662 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/axfr-retrieve"))
664 print("status for axfr-retrieve:", data
)
665 self
.assertEqual(data
['result'], u
'Added retrieval request for \'' + payload
['name'] +
666 '\' from master 127.0.0.2')
668 def test_notify_master_zone(self
):
669 name
, payload
, data
= self
.create_zone(kind
='Master')
670 print("payload:", payload
)
672 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/notify"))
674 print("status for notify:", data
)
675 self
.assertEqual(data
['result'], 'Notification queued')
677 def test_get_zone_with_symbols(self
):
678 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
679 name
= payload
['name']
680 zone_id
= (name
.replace('/', '=2F'))
681 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
))
683 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
684 self
.assertIn(k
, data
)
686 self
.assertEquals(data
[k
], payload
[k
])
688 def test_get_zone(self
):
689 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
691 example_com
= [domain
for domain
in domains
if domain
['name'] == u
'example.com.'][0]
692 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + example_com
['id']))
693 self
.assert_success_json(r
)
695 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
696 self
.assertIn(k
, data
)
697 self
.assertEquals(data
['name'], 'example.com.')
699 def test_import_zone_broken(self
):
701 payload
['zone'] = """
702 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
703 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
704 ;; WARNING: recursion requested but not available
706 ;; OPT PSEUDOSECTION:
707 ; EDNS: version: 0, flags:; udp: 1680
709 ;powerdns.com. IN SOA
712 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
713 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
714 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
715 powerdns-broken.com. 86400 IN A 82.94.213.34
716 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
717 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
718 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
720 payload
['name'] = 'powerdns-broken.com.'
721 payload
['kind'] = 'Master'
722 payload
['nameservers'] = []
723 r
= self
.session
.post(
724 self
.url("/api/v1/servers/localhost/zones"),
725 data
=json
.dumps(payload
),
726 headers
={'content-type': 'application/json'})
727 self
.assertEquals(r
.status_code
, 422)
729 def test_import_zone_axfr_outofzone(self
):
730 # Ensure we don't create out-of-zone records
731 name
= unique_zone_name()
733 payload
['zone'] = """
734 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
735 NAME 3600 IN NS powerdnssec2.ds9a.nl.
736 example.org. 3600 IN AAAA 2001:888:2000:1d::2
737 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
738 """.replace('NAME', name
)
739 payload
['name'] = name
740 payload
['kind'] = 'Master'
741 payload
['nameservers'] = []
742 r
= self
.session
.post(
743 self
.url("/api/v1/servers/localhost/zones"),
744 data
=json
.dumps(payload
),
745 headers
={'content-type': 'application/json'})
746 self
.assertEquals(r
.status_code
, 422)
747 self
.assertEqual(r
.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
749 def test_import_zone_axfr(self
):
751 payload
['zone'] = """
752 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
753 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
754 ;; WARNING: recursion requested but not available
756 ;; OPT PSEUDOSECTION:
757 ; EDNS: version: 0, flags:; udp: 1680
759 ;powerdns.com. IN SOA
762 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
763 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
764 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
765 powerdns.com. 86400 IN A 82.94.213.34
766 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
767 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
768 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
770 payload
['name'] = 'powerdns.com.'
771 payload
['kind'] = 'Master'
772 payload
['nameservers'] = []
773 payload
['soa_edit_api'] = '' # turn off so exact SOA comparison works.
774 r
= self
.session
.post(
775 self
.url("/api/v1/servers/localhost/zones"),
776 data
=json
.dumps(payload
),
777 headers
={'content-type': 'application/json'})
778 self
.assert_success_json(r
)
780 self
.assertIn('name', data
)
784 {'content': 'powerdnssec1.ds9a.nl.'},
785 {'content': 'powerdnssec2.ds9a.nl.'},
788 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
791 {'content': '0 xs.powerdns.com.'},
794 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
797 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
801 eq_zone_rrsets(data
['rrsets'], expected
)
803 # check content in DB is stored WITHOUT trailing dot.
804 dbrecs
= get_db_records(payload
['name'], 'NS')
805 dbrec
= next((dbrec
for dbrec
in dbrecs
if dbrec
['content'].startswith('powerdnssec1')))
806 self
.assertEqual(dbrec
['content'], 'powerdnssec1.ds9a.nl')
808 def test_import_zone_bind(self
):
810 payload
['zone'] = """
811 $TTL 86400 ; 24 hours could have been written as 24h or 1d
812 ; $TTL used for all RRs without explicit TTL value
814 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
821 IN NS ns1.example.org. ; in the domain
822 IN NS ns2.smokeyjoe.com. ; external to domain
823 IN MX 10 mail.another.com. ; external mail provider
824 ; server host definitions
825 ns1 IN A 192.168.0.1 ;name server definition
826 www IN A 192.168.0.2 ;web server definition
827 ftp IN CNAME www.example.org. ;ftp server definition
828 ; non server domain hosts
829 bill IN A 192.168.0.3
830 fred IN A 192.168.0.4
832 payload
['name'] = 'example.org.'
833 payload
['kind'] = 'Master'
834 payload
['nameservers'] = []
835 payload
['soa_edit_api'] = '' # turn off so exact SOA comparison works.
836 r
= self
.session
.post(
837 self
.url("/api/v1/servers/localhost/zones"),
838 data
=json
.dumps(payload
),
839 headers
={'content-type': 'application/json'})
840 self
.assert_success_json(r
)
842 self
.assertIn('name', data
)
846 {'content': 'ns1.example.org.'},
847 {'content': 'ns2.smokeyjoe.com.'},
850 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
853 {'content': '10 mail.another.com.'},
856 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
857 {'content': '192.168.0.2', 'name': 'www.example.org.'},
858 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
859 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
862 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
866 eq_zone_rrsets(data
['rrsets'], expected
)
868 def test_export_zone_json(self
):
869 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
871 r
= self
.session
.get(
872 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
873 headers
={'accept': 'application/json;q=0.9,*/*;q=0.8'}
875 self
.assert_success_json(r
)
877 self
.assertIn('zone', data
)
878 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
879 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
880 name
+ '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name
+
881 ' 0 10800 3600 604800 3600']
882 self
.assertEquals(data
['zone'].strip().split('\n'), expected_data
)
884 def test_export_zone_text(self
):
885 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
887 r
= self
.session
.get(
888 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
889 headers
={'accept': '*/*'}
891 data
= r
.text
.strip().split("\n")
892 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
893 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
894 name
+ '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name
+
895 ' 0 10800 3600 604800 3600']
896 self
.assertEquals(data
, expected_data
)
898 def test_update_zone(self
):
899 name
, payload
, zone
= self
.create_zone()
900 name
= payload
['name']
901 # update, set as Master and enable SOA-EDIT-API
904 'masters': ['192.0.2.1', '192.0.2.2'],
905 'soa_edit_api': 'EPOCH',
908 r
= self
.session
.put(
909 self
.url("/api/v1/servers/localhost/zones/" + name
),
910 data
=json
.dumps(payload
),
911 headers
={'content-type': 'application/json'})
912 self
.assert_success(r
)
913 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
914 for k
in payload
.keys():
915 self
.assertIn(k
, data
)
916 self
.assertEquals(data
[k
], payload
[k
])
917 # update, back to Native and empty(off)
923 r
= self
.session
.put(
924 self
.url("/api/v1/servers/localhost/zones/" + name
),
925 data
=json
.dumps(payload
),
926 headers
={'content-type': 'application/json'})
927 self
.assert_success(r
)
928 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
929 for k
in payload
.keys():
930 self
.assertIn(k
, data
)
931 self
.assertEquals(data
[k
], payload
[k
])
933 def test_zone_rr_update(self
):
934 name
, payload
, zone
= self
.create_zone()
935 # do a replace (= update)
937 'changetype': 'replace',
943 "content": "ns1.bar.com.",
947 "content": "ns2-disabled.bar.com.",
952 payload
= {'rrsets': [rrset
]}
953 r
= self
.session
.patch(
954 self
.url("/api/v1/servers/localhost/zones/" + name
),
955 data
=json
.dumps(payload
),
956 headers
={'content-type': 'application/json'})
957 self
.assert_success(r
)
958 # verify that (only) the new record is there
959 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
960 self
.assertEquals(get_rrset(data
, name
, 'NS')['records'], rrset
['records'])
962 def test_zone_rr_update_mx(self
):
963 # Important to test with MX records, as they have a priority field, which must end up in the content field.
964 name
, payload
, zone
= self
.create_zone()
965 # do a replace (= update)
967 'changetype': 'replace',
973 "content": "10 mail.example.org.",
978 payload
= {'rrsets': [rrset
]}
979 r
= self
.session
.patch(
980 self
.url("/api/v1/servers/localhost/zones/" + name
),
981 data
=json
.dumps(payload
),
982 headers
={'content-type': 'application/json'})
983 self
.assert_success(r
)
984 # verify that (only) the new record is there
985 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
986 self
.assertEquals(get_rrset(data
, name
, 'MX')['records'], rrset
['records'])
988 def test_zone_rr_update_opt(self
):
989 name
, payload
, zone
= self
.create_zone()
990 # do a replace (= update)
992 'changetype': 'replace',
1003 payload
= {'rrsets': [rrset
]}
1004 r
= self
.session
.patch(
1005 self
.url("/api/v1/servers/localhost/zones/" + name
),
1006 data
=json
.dumps(payload
),
1007 headers
={'content-type': 'application/json'})
1008 self
.assertEquals(r
.status_code
, 422)
1009 self
.assertIn('OPT: invalid type given', r
.json()['error'])
1011 def test_zone_rr_update_multiple_rrsets(self
):
1012 name
, payload
, zone
= self
.create_zone()
1014 'changetype': 'replace',
1021 "content": "ns9999.example.com.",
1027 'changetype': 'replace',
1033 "content": "10 mx444.example.com.",
1038 payload
= {'rrsets': [rrset1
, rrset2
]}
1039 r
= self
.session
.patch(
1040 self
.url("/api/v1/servers/localhost/zones/" + name
),
1041 data
=json
.dumps(payload
),
1042 headers
={'content-type': 'application/json'})
1043 self
.assert_success(r
)
1044 # verify that all rrsets have been updated
1045 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1046 self
.assertEquals(get_rrset(data
, name
, 'NS')['records'], rrset1
['records'])
1047 self
.assertEquals(get_rrset(data
, name
, 'MX')['records'], rrset2
['records'])
1049 def test_zone_rr_update_duplicate_record(self
):
1050 name
, payload
, zone
= self
.create_zone()
1052 'changetype': 'replace',
1057 {"content": "ns9999.example.com.", "disabled": False},
1058 {"content": "ns9996.example.com.", "disabled": False},
1059 {"content": "ns9987.example.com.", "disabled": False},
1060 {"content": "ns9988.example.com.", "disabled": False},
1061 {"content": "ns9999.example.com.", "disabled": False},
1064 payload
= {'rrsets': [rrset
]}
1065 r
= self
.session
.patch(
1066 self
.url("/api/v1/servers/localhost/zones/" + name
),
1067 data
=json
.dumps(payload
),
1068 headers
={'content-type': 'application/json'})
1069 self
.assertEquals(r
.status_code
, 422)
1070 self
.assertIn('Duplicate record in RRset', r
.json()['error'])
1072 def test_zone_rr_update_duplicate_rrset(self
):
1073 name
, payload
, zone
= self
.create_zone()
1075 'changetype': 'replace',
1081 "content": "ns9999.example.com.",
1087 'changetype': 'replace',
1093 "content": "ns9998.example.com.",
1098 payload
= {'rrsets': [rrset1
, rrset2
]}
1099 r
= self
.session
.patch(
1100 self
.url("/api/v1/servers/localhost/zones/" + name
),
1101 data
=json
.dumps(payload
),
1102 headers
={'content-type': 'application/json'})
1103 self
.assertEquals(r
.status_code
, 422)
1104 self
.assertIn('Duplicate RRset', r
.json()['error'])
1106 def test_zone_rr_delete(self
):
1107 name
, payload
, zone
= self
.create_zone()
1108 # do a delete of all NS records (these are created with the zone)
1110 'changetype': 'delete',
1114 payload
= {'rrsets': [rrset
]}
1115 r
= self
.session
.patch(
1116 self
.url("/api/v1/servers/localhost/zones/" + name
),
1117 data
=json
.dumps(payload
),
1118 headers
={'content-type': 'application/json'})
1119 self
.assert_success(r
)
1120 # verify that the records are gone
1121 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1122 self
.assertIsNone(get_rrset(data
, name
, 'NS'))
1124 def test_zone_disable_reenable(self
):
1125 # This also tests that SOA-EDIT-API works.
1126 name
, payload
, zone
= self
.create_zone(soa_edit_api
='EPOCH')
1127 # disable zone by disabling SOA
1129 'changetype': 'replace',
1135 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1140 payload
= {'rrsets': [rrset
]}
1141 r
= self
.session
.patch(
1142 self
.url("/api/v1/servers/localhost/zones/" + name
),
1143 data
=json
.dumps(payload
),
1144 headers
={'content-type': 'application/json'})
1145 self
.assert_success(r
)
1146 # check SOA serial has been edited
1147 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1148 soa_serial1
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1149 self
.assertNotEquals(soa_serial1
, '1')
1150 # make sure domain is still in zone list (disabled SOA!)
1151 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
1153 self
.assertEquals(len([domain
for domain
in domains
if domain
['name'] == name
]), 1)
1154 # sleep 1sec to ensure the EPOCH value changes for the next request
1156 # verify that modifying it still works
1157 rrset
['records'][0]['disabled'] = False
1158 payload
= {'rrsets': [rrset
]}
1159 r
= self
.session
.patch(
1160 self
.url("/api/v1/servers/localhost/zones/" + name
),
1161 data
=json
.dumps(payload
),
1162 headers
={'content-type': 'application/json'})
1163 self
.assert_success(r
)
1164 # check SOA serial has been edited again
1165 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1166 soa_serial2
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1167 self
.assertNotEquals(soa_serial2
, '1')
1168 self
.assertNotEquals(soa_serial2
, soa_serial1
)
1170 def test_zone_rr_update_out_of_zone(self
):
1171 name
, payload
, zone
= self
.create_zone()
1172 # replace with qname mismatch
1174 'changetype': 'replace',
1175 'name': 'not-in-zone.',
1180 "content": "ns1.bar.com.",
1185 payload
= {'rrsets': [rrset
]}
1186 r
= self
.session
.patch(
1187 self
.url("/api/v1/servers/localhost/zones/" + name
),
1188 data
=json
.dumps(payload
),
1189 headers
={'content-type': 'application/json'})
1190 self
.assertEquals(r
.status_code
, 422)
1191 self
.assertIn('out of zone', r
.json()['error'])
1193 def test_zone_rr_update_restricted_chars(self
):
1194 name
, payload
, zone
= self
.create_zone()
1195 # replace with qname mismatch
1197 'changetype': 'replace',
1198 'name': 'test:' + name
,
1203 "content": "ns1.bar.com.",
1208 payload
= {'rrsets': [rrset
]}
1209 r
= self
.session
.patch(
1210 self
.url("/api/v1/servers/localhost/zones/" + name
),
1211 data
=json
.dumps(payload
),
1212 headers
={'content-type': 'application/json'})
1213 self
.assertEquals(r
.status_code
, 422)
1214 self
.assertIn('contains unsupported characters', r
.json()['error'])
1216 def test_rrset_unknown_type(self
):
1217 name
, payload
, zone
= self
.create_zone()
1219 'changetype': 'replace',
1225 "content": "4.3.2.1",
1230 payload
= {'rrsets': [rrset
]}
1231 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1232 headers
={'content-type': 'application/json'})
1233 self
.assertEquals(r
.status_code
, 422)
1234 self
.assertIn('unknown type', r
.json()['error'])
1236 def test_rrset_cname_and_other(self
):
1237 name
, payload
, zone
= self
.create_zone()
1239 'changetype': 'replace',
1245 "content": "example.org.",
1250 payload
= {'rrsets': [rrset
]}
1251 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1252 headers
={'content-type': 'application/json'})
1253 self
.assertEquals(r
.status_code
, 422)
1254 self
.assertIn('Conflicts with pre-existing non-CNAME RRset', r
.json()['error'])
1256 def test_rrset_other_and_cname(self
):
1257 name
, payload
, zone
= self
.create_zone()
1259 'changetype': 'replace',
1260 'name': 'sub.'+name
,
1265 "content": "example.org.",
1270 payload
= {'rrsets': [rrset
]}
1271 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1272 headers
={'content-type': 'application/json'})
1273 self
.assert_success(r
)
1275 'changetype': 'replace',
1276 'name': 'sub.'+name
,
1281 "content": "1.2.3.4",
1286 payload
= {'rrsets': [rrset
]}
1287 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1288 headers
={'content-type': 'application/json'})
1289 self
.assertEquals(r
.status_code
, 422)
1290 self
.assertIn('Conflicts with pre-existing CNAME RRset', r
.json()['error'])
1292 def test_create_zone_with_leading_space(self
):
1293 # Actual regression.
1294 name
, payload
, zone
= self
.create_zone()
1296 'changetype': 'replace',
1302 "content": " 4.3.2.1",
1307 payload
= {'rrsets': [rrset
]}
1308 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1309 headers
={'content-type': 'application/json'})
1310 self
.assertEquals(r
.status_code
, 422)
1311 self
.assertIn('Not in expected format', r
.json()['error'])
1313 def test_zone_rr_delete_out_of_zone(self
):
1314 name
, payload
, zone
= self
.create_zone()
1316 'changetype': 'delete',
1317 'name': 'not-in-zone.',
1320 payload
= {'rrsets': [rrset
]}
1321 r
= self
.session
.patch(
1322 self
.url("/api/v1/servers/localhost/zones/" + name
),
1323 data
=json
.dumps(payload
),
1324 headers
={'content-type': 'application/json'})
1326 self
.assert_success(r
) # succeed so users can fix their wrong, old data
1328 def test_zone_delete(self
):
1329 name
, payload
, zone
= self
.create_zone()
1330 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
1331 self
.assertEquals(r
.status_code
, 204)
1332 self
.assertNotIn('Content-Type', r
.headers
)
1334 def test_zone_comment_create(self
):
1335 name
, payload
, zone
= self
.create_zone()
1337 'changetype': 'replace',
1344 'content': 'blah blah',
1348 'content': 'blah blah bleh',
1352 payload
= {'rrsets': [rrset
]}
1353 r
= self
.session
.patch(
1354 self
.url("/api/v1/servers/localhost/zones/" + name
),
1355 data
=json
.dumps(payload
),
1356 headers
={'content-type': 'application/json'})
1357 self
.assert_success(r
)
1358 # make sure the comments have been set, and that the NS
1359 # records are still present
1360 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1361 serverset
= get_rrset(data
, name
, 'NS')
1363 self
.assertNotEquals(serverset
['records'], [])
1364 self
.assertNotEquals(serverset
['comments'], [])
1365 # verify that modified_at has been set by pdns
1366 self
.assertNotEquals([c
for c
in serverset
['comments']][0]['modified_at'], 0)
1367 # verify that TTL is correct (regression test)
1368 self
.assertEquals(serverset
['ttl'], 3600)
1370 def test_zone_comment_delete(self
):
1371 # Test: Delete ONLY comments.
1372 name
, payload
, zone
= self
.create_zone()
1374 'changetype': 'replace',
1379 payload
= {'rrsets': [rrset
]}
1380 r
= self
.session
.patch(
1381 self
.url("/api/v1/servers/localhost/zones/" + name
),
1382 data
=json
.dumps(payload
),
1383 headers
={'content-type': 'application/json'})
1384 self
.assert_success(r
)
1385 # make sure the NS records are still present
1386 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1387 serverset
= get_rrset(data
, name
, 'NS')
1389 self
.assertNotEquals(serverset
['records'], [])
1390 self
.assertEquals(serverset
['comments'], [])
1392 def test_zone_comment_stay_intact(self
):
1393 # Test if comments on an rrset stay intact if the rrset is replaced
1394 name
, payload
, zone
= self
.create_zone()
1397 'changetype': 'replace',
1403 'content': 'oh hi there',
1408 payload
= {'rrsets': [rrset
]}
1409 r
= self
.session
.patch(
1410 self
.url("/api/v1/servers/localhost/zones/" + name
),
1411 data
=json
.dumps(payload
),
1412 headers
={'content-type': 'application/json'})
1413 self
.assert_success(r
)
1414 # replace rrset records
1416 'changetype': 'replace',
1422 "content": "ns1.bar.com.",
1427 payload2
= {'rrsets': [rrset2
]}
1428 r
= self
.session
.patch(
1429 self
.url("/api/v1/servers/localhost/zones/" + name
),
1430 data
=json
.dumps(payload2
),
1431 headers
={'content-type': 'application/json'})
1432 self
.assert_success(r
)
1433 # make sure the comments still exist
1434 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1435 serverset
= get_rrset(data
, name
, 'NS')
1437 self
.assertEquals(serverset
['records'], rrset2
['records'])
1438 self
.assertEquals(serverset
['comments'], rrset
['comments'])
1440 def test_zone_auto_ptr_ipv4_create(self
):
1441 revzone
= '4.2.192.in-addr.arpa.'
1442 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1443 name
= unique_zone_name()
1449 "content": "192.2.4.44",
1454 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
1455 del rrset
['records'][0]['set-ptr']
1456 self
.assertEquals(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
1457 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1458 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1460 self
.assertEquals(revsets
, [{
1461 u
'name': u
'44.4.2.192.in-addr.arpa.',
1470 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1471 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1473 def test_zone_auto_ptr_ipv4_update(self
):
1474 revzone
= '0.2.192.in-addr.arpa.'
1475 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1476 name
, payload
, zone
= self
.create_zone()
1478 'changetype': 'replace',
1484 "content": '192.2.0.2',
1490 payload
= {'rrsets': [rrset
]}
1491 r
= self
.session
.patch(
1492 self
.url("/api/v1/servers/localhost/zones/" + name
),
1493 data
=json
.dumps(payload
),
1494 headers
={'content-type': 'application/json'})
1495 self
.assert_success(r
)
1496 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1497 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1499 self
.assertEquals(revsets
, [{
1500 u
'name': u
'2.0.2.192.in-addr.arpa.',
1509 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1510 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1512 def test_zone_auto_ptr_ipv6_update(self
):
1514 revzone
= '8.b.d.0.1.0.0.2.ip6.arpa.'
1515 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1516 name
, payload
, zone
= self
.create_zone()
1518 'changetype': 'replace',
1524 "content": '2001:DB8::bb:aa',
1530 payload
= {'rrsets': [rrset
]}
1531 r
= self
.session
.patch(
1532 self
.url("/api/v1/servers/localhost/zones/" + name
),
1533 data
=json
.dumps(payload
),
1534 headers
={'content-type': 'application/json'})
1535 self
.assert_success(r
)
1536 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1537 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1539 self
.assertEquals(revsets
, [{
1540 u
'name': u
'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1549 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1550 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1552 def test_search_rr_exact_zone(self
):
1553 name
= unique_zone_name()
1554 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
1555 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.')))
1556 self
.assert_success_json(r
)
1558 self
.assertEquals(r
.json(), [
1559 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
1560 {u
'content': u
'ns1.example.com.',
1561 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1562 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1563 {u
'content': u
'ns2.example.com.',
1564 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1565 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1566 {u
'content': u
'a.misconfigured.powerdns.server. hostmaster.'+name
+' 22 10800 3600 604800 3600',
1567 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1568 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
1571 def test_search_rr_substring(self
):
1572 name
= unique_zone_name()
1574 self
.create_zone(name
=name
)
1575 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
1576 self
.assert_success_json(r
)
1578 # should return zone, SOA, ns1, ns2
1579 self
.assertEquals(len(r
.json()), 4)
1581 def test_search_rr_case_insensitive(self
):
1582 name
= unique_zone_name()+'testsuffix.'
1583 self
.create_zone(name
=name
)
1584 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1585 self
.assert_success_json(r
)
1587 # should return zone, SOA, ns1, ns2
1588 self
.assertEquals(len(r
.json()), 4)
1590 def test_search_after_rectify_with_ent(self
):
1591 name
= unique_zone_name()
1592 search
= name
.split('.')[0]
1594 "name": 'sub.sub.' + name
,
1598 "content": "4.3.2.1",
1602 self
.create_zone(name
=name
, rrsets
=[rrset
])
1603 pdnsutil_rectify(name
)
1604 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
1605 self
.assert_success_json(r
)
1607 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1608 self
.assertEquals(len(r
.json()), 5)
1610 def test_cname_at_ent_place(self
):
1611 name
, payload
, zone
= self
.create_zone(api_rectify
=True)
1613 'changetype': 'replace',
1614 'name': 'sub2.sub1.' + name
,
1618 'content': "4.3.2.1",
1622 payload
= {'rrsets': [rrset
]}
1623 r
= self
.session
.patch(
1624 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
1625 data
=json
.dumps(payload
),
1626 headers
={'content-type': 'application/json'})
1627 self
.assertEquals(r
.status_code
, 204)
1629 'changetype': 'replace',
1630 'name': 'sub1.' + name
,
1634 'content': "www.example.org.",
1638 payload
= {'rrsets': [rrset
]}
1639 r
= self
.session
.patch(
1640 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
1641 data
=json
.dumps(payload
),
1642 headers
={'content-type': 'application/json'})
1643 self
.assertEquals(r
.status_code
, 204)
1645 def test_rrset_parameter_post_false(self
):
1646 name
= unique_zone_name()
1650 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1652 r
= self
.session
.post(
1653 self
.url("/api/v1/servers/localhost/zones?rrsets=false"),
1654 data
=json
.dumps(payload
),
1655 headers
={'content-type': 'application/json'})
1657 self
.assert_success_json(r
)
1658 self
.assertEquals(r
.status_code
, 201)
1659 self
.assertEquals(r
.json().get('rrsets'), None)
1661 def test_rrset_false_parameter(self
):
1662 name
= unique_zone_name()
1663 self
.create_zone(name
=name
, kind
='Native')
1664 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=false"))
1665 self
.assert_success_json(r
)
1667 self
.assertEquals(r
.json().get('rrsets'), None)
1669 def test_rrset_true_parameter(self
):
1670 name
= unique_zone_name()
1671 self
.create_zone(name
=name
, kind
='Native')
1672 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=true"))
1673 self
.assert_success_json(r
)
1675 self
.assertEquals(len(r
.json().get('rrsets')), 2)
1677 def test_wrong_rrset_parameter(self
):
1678 name
= unique_zone_name()
1679 self
.create_zone(name
=name
, kind
='Native')
1680 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=foobar"))
1681 self
.assertEquals(r
.status_code
, 422)
1682 self
.assertIn("'rrsets' request parameter value 'foobar' is not supported", r
.json()['error'])
1685 @unittest.skipIf(not is_auth(), "Not applicable")
1686 class AuthRootZone(ApiTestCase
, AuthZonesHelperMixin
):
1689 super(AuthRootZone
, self
).setUp()
1690 # zone name is not unique, so delete the zone before each individual test.
1691 self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/=2E"))
1693 def test_create_zone(self
):
1694 name
, payload
, data
= self
.create_zone(name
='.', serial
=22, soa_edit_api
='')
1695 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
1696 self
.assertIn(k
, data
)
1698 self
.assertEquals(data
[k
], payload
[k
])
1699 # validate generated SOA
1700 rec
= get_first_rec(data
, '.', 'SOA')
1703 "a.misconfigured.powerdns.server. hostmaster. " + str(payload
['serial']) +
1704 " 10800 3600 604800 3600"
1706 # Regression test: verify zone list works
1707 zonelist
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones")).json()
1708 print("zonelist:", zonelist
)
1709 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
1710 # Also test that fetching the zone works.
1711 print("id:", data
['id'])
1712 self
.assertEquals(data
['id'], '=2E')
1713 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id'])).json()
1714 print("zone (fetched):", data
)
1715 for k
in ('name', 'kind'):
1716 self
.assertIn(k
, data
)
1717 self
.assertEquals(data
[k
], payload
[k
])
1718 self
.assertEqual(data
['rrsets'][0]['name'], '.')
1720 def test_update_zone(self
):
1721 name
, payload
, zone
= self
.create_zone(name
='.')
1723 # update, set as Master and enable SOA-EDIT-API
1726 'masters': ['192.0.2.1', '192.0.2.2'],
1727 'soa_edit_api': 'EPOCH',
1730 r
= self
.session
.put(
1731 self
.url("/api/v1/servers/localhost/zones/" + zone_id
),
1732 data
=json
.dumps(payload
),
1733 headers
={'content-type': 'application/json'})
1734 self
.assert_success(r
)
1735 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
)).json()
1736 for k
in payload
.keys():
1737 self
.assertIn(k
, data
)
1738 self
.assertEquals(data
[k
], payload
[k
])
1739 # update, back to Native and empty(off)
1745 r
= self
.session
.put(
1746 self
.url("/api/v1/servers/localhost/zones/" + zone_id
),
1747 data
=json
.dumps(payload
),
1748 headers
={'content-type': 'application/json'})
1749 self
.assert_success(r
)
1750 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
)).json()
1751 for k
in payload
.keys():
1752 self
.assertIn(k
, data
)
1753 self
.assertEquals(data
[k
], payload
[k
])
1756 @unittest.skipIf(not is_recursor(), "Not applicable")
1757 class RecursorZones(ApiTestCase
):
1759 def create_zone(self
, name
=None, kind
=None, rd
=False, servers
=None):
1761 name
= unique_zone_name()
1768 'recursion_desired': rd
1770 r
= self
.session
.post(
1771 self
.url("/api/v1/servers/localhost/zones"),
1772 data
=json
.dumps(payload
),
1773 headers
={'content-type': 'application/json'})
1774 self
.assert_success_json(r
)
1775 return payload
, r
.json()
1777 def test_create_auth_zone(self
):
1778 payload
, data
= self
.create_zone(kind
='Native')
1779 for k
in payload
.keys():
1780 self
.assertEquals(data
[k
], payload
[k
])
1782 def test_create_zone_no_name(self
):
1786 'servers': ['8.8.8.8'],
1787 'recursion_desired': False,
1790 r
= self
.session
.post(
1791 self
.url("/api/v1/servers/localhost/zones"),
1792 data
=json
.dumps(payload
),
1793 headers
={'content-type': 'application/json'})
1794 self
.assertEquals(r
.status_code
, 422)
1795 self
.assertIn('is not canonical', r
.json()['error'])
1797 def test_create_forwarded_zone(self
):
1798 payload
, data
= self
.create_zone(kind
='Forwarded', rd
=False, servers
=['8.8.8.8'])
1799 # return values are normalized
1800 payload
['servers'][0] += ':53'
1801 for k
in payload
.keys():
1802 self
.assertEquals(data
[k
], payload
[k
])
1804 def test_create_forwarded_rd_zone(self
):
1805 payload
, data
= self
.create_zone(name
='google.com.', kind
='Forwarded', rd
=True, servers
=['8.8.8.8'])
1806 # return values are normalized
1807 payload
['servers'][0] += ':53'
1808 for k
in payload
.keys():
1809 self
.assertEquals(data
[k
], payload
[k
])
1811 def test_create_auth_zone_with_symbols(self
):
1812 payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name(), kind
='Native')
1813 expected_id
= (payload
['name'].replace('/', '=2F'))
1814 for k
in payload
.keys():
1815 self
.assertEquals(data
[k
], payload
[k
])
1816 self
.assertEquals(data
['id'], expected_id
)
1818 def test_rename_auth_zone(self
):
1819 payload
, data
= self
.create_zone(kind
='Native')
1820 name
= payload
['name']
1823 'name': 'renamed-'+name
,
1825 'recursion_desired': False
1827 r
= self
.session
.put(
1828 self
.url("/api/v1/servers/localhost/zones/" + name
),
1829 data
=json
.dumps(payload
),
1830 headers
={'content-type': 'application/json'})
1831 self
.assert_success(r
)
1832 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + payload
['name'])).json()
1833 for k
in payload
.keys():
1834 self
.assertEquals(data
[k
], payload
[k
])
1836 def test_zone_delete(self
):
1837 payload
, zone
= self
.create_zone(kind
='Native')
1838 name
= payload
['name']
1839 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
1840 self
.assertEquals(r
.status_code
, 204)
1841 self
.assertNotIn('Content-Type', r
.headers
)
1843 def test_search_rr_exact_zone(self
):
1844 name
= unique_zone_name()
1845 self
.create_zone(name
=name
, kind
='Native')
1846 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
))
1847 self
.assert_success_json(r
)
1849 self
.assertEquals(r
.json(), [{u
'type': u
'zone', u
'name': name
, u
'zone_id': name
}])
1851 def test_search_rr_substring(self
):
1852 name
= 'search-rr-zone.name.'
1853 self
.create_zone(name
=name
, kind
='Native')
1854 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
1855 self
.assert_success_json(r
)
1857 # should return zone, SOA
1858 self
.assertEquals(len(r
.json()), 2)
1860 @unittest.skipIf(not is_auth(), "Not applicable")
1861 class AuthZoneKeys(ApiTestCase
, AuthZonesHelperMixin
):
1863 def test_get_keys(self
):
1864 r
= self
.session
.get(
1865 self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
1866 self
.assert_success_json(r
)
1868 self
.assertGreater(len(keys
), 0)
1870 key0
= deepcopy(keys
[0])
1874 u
'algorithm': u
'ECDSAP256SHA256',
1877 u
'type': u
'Cryptokey',
1881 self
.assertEquals(key0
, expected
)
1883 keydata
= keys
[0]['dnskey'].split()
1884 self
.assertEqual(len(keydata
), 4)