]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
1 from __future__
import print_function
5 from copy
import deepcopy
6 from parameterized
import parameterized
7 from pprint
import pprint
8 from test_helper
import ApiTestCase
, unique_zone_name
, is_auth
, is_recursor
, get_db_records
, pdnsutil_rectify
11 def get_rrset(data
, qname
, qtype
):
12 for rrset
in data
['rrsets']:
13 if rrset
['name'] == qname
and rrset
['type'] == qtype
:
18 def get_first_rec(data
, qname
, qtype
):
19 rrset
= get_rrset(data
, qname
, qtype
)
21 return rrset
['records'][0]
25 def eq_zone_rrsets(rrsets
, expected
):
28 for type_
, expected_records
in expected
.items():
30 data_got
[type_
] = set()
31 data_expected
[type_
] = set()
32 uses_name
= any(['name' in expected_record
for expected_record
in expected_records
])
33 # minify + convert received data
34 for rrset
in [rrset
for rrset
in rrsets
if rrset
['type'] == type_
]:
36 for r
in rrset
['records']:
37 data_got
[type_
].add((rrset
['name'] if uses_name
else '@', rrset
['type'], r
['content']))
38 # minify expected data
39 for r
in expected_records
:
40 data_expected
[type_
].add((r
['name'] if uses_name
else '@', type_
, r
['content']))
42 print("eq_zone_rrsets: got:")
44 print("eq_zone_rrsets: expected:")
47 assert data_got
== data_expected
, "%r != %r" % (data_got
, data_expected
)
50 class Zones(ApiTestCase
):
52 def _test_list_zones(self
, dnssec
=True):
53 path
= "/api/v1/servers/localhost/zones"
55 path
= path
+ "?dnssec=false"
56 r
= self
.session
.get(self
.url(path
))
57 self
.assert_success_json(r
)
59 example_com
= [domain
for domain
in domains
if domain
['name'] in ('example.com', 'example.com.')]
60 self
.assertEquals(len(example_com
), 1)
61 example_com
= example_com
[0]
63 required_fields
= ['id', 'url', 'name', 'kind']
65 required_fields
= required_fields
+ ['masters', 'last_check', 'notified_serial', 'serial', 'account']
67 required_fields
= required_fields
= ['dnssec', 'edited_serial']
68 self
.assertNotEquals(example_com
['serial'], 0)
70 self
.assertNotIn('dnssec', example_com
)
72 required_fields
= required_fields
+ ['recursion_desired', 'servers']
73 for field
in required_fields
:
74 self
.assertIn(field
, example_com
)
76 def test_list_zones_with_dnssec(self
):
78 self
._test
_list
_zones
(True)
80 def test_list_zones_without_dnssec(self
):
81 self
._test
_list
_zones
(False)
83 class AuthZonesHelperMixin(object):
84 def create_zone(self
, name
=None, **kwargs
):
86 name
= unique_zone_name()
90 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
92 for k
, v
in kwargs
.items():
97 print("sending", payload
)
98 r
= self
.session
.post(
99 self
.url("/api/v1/servers/localhost/zones"),
100 data
=json
.dumps(payload
),
101 headers
={'content-type': 'application/json'})
102 self
.assert_success_json(r
)
103 self
.assertEquals(r
.status_code
, 201)
105 print("reply", reply
)
106 return name
, payload
, reply
109 @unittest.skipIf(not is_auth(), "Not applicable")
110 class AuthZones(ApiTestCase
, AuthZonesHelperMixin
):
112 def test_create_zone(self
):
113 # soa_edit_api has a default, override with empty for this test
114 name
, payload
, data
= self
.create_zone(serial
=22, soa_edit_api
='')
115 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'edited_serial', 'soa_edit_api', 'soa_edit', 'account'):
116 self
.assertIn(k
, data
)
118 self
.assertEquals(data
[k
], payload
[k
])
119 # validate generated SOA
120 expected_soa
= "a.misconfigured.powerdns.server. hostmaster." + name
+ " " + \
121 str(payload
['serial']) + " 10800 3600 604800 3600"
123 get_first_rec(data
, name
, 'SOA')['content'],
126 # Because we had confusion about dots, check that the DB is without dots.
127 dbrecs
= get_db_records(name
, 'SOA')
128 self
.assertEqual(dbrecs
[0]['content'], expected_soa
.replace('. ', ' '))
129 self
.assertNotEquals(data
['serial'], data
['edited_serial'])
131 def test_create_zone_with_soa_edit_api(self
):
132 # soa_edit_api wins over serial
133 name
, payload
, data
= self
.create_zone(soa_edit_api
='EPOCH', serial
=10)
134 for k
in ('soa_edit_api', ):
135 self
.assertIn(k
, data
)
137 self
.assertEquals(data
[k
], payload
[k
])
138 # generated EPOCH serial surely is > fixed serial we passed in
140 self
.assertGreater(data
['serial'], payload
['serial'])
141 soa_serial
= int(get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2])
142 self
.assertGreater(soa_serial
, payload
['serial'])
143 self
.assertEquals(soa_serial
, data
['serial'])
145 def test_create_zone_with_account(self
):
146 # soa_edit_api wins over serial
147 name
, payload
, data
= self
.create_zone(account
='anaccount', serial
=10)
149 for k
in ('account', ):
150 self
.assertIn(k
, data
)
152 self
.assertEquals(data
[k
], payload
[k
])
154 def test_create_zone_default_soa_edit_api(self
):
155 name
, payload
, data
= self
.create_zone()
157 self
.assertEquals(data
['soa_edit_api'], 'DEFAULT')
159 def test_create_zone_exists(self
):
160 name
, payload
, data
= self
.create_zone()
167 r
= self
.session
.post(
168 self
.url("/api/v1/servers/localhost/zones"),
169 data
=json
.dumps(payload
),
170 headers
={'content-type': 'application/json'})
171 self
.assertEquals(r
.status_code
, 409) # Conflict - already exists
173 def test_create_zone_with_soa_edit(self
):
174 name
, payload
, data
= self
.create_zone(soa_edit
='INCEPTION-INCREMENT', soa_edit_api
='SOA-EDIT-INCREASE')
176 self
.assertEquals(data
['soa_edit'], 'INCEPTION-INCREMENT')
177 self
.assertEquals(data
['soa_edit_api'], 'SOA-EDIT-INCREASE')
178 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
179 # These particular settings lead to the first serial set to YYYYMMDD01.
180 self
.assertEquals(soa_serial
[-2:], '01')
182 'changetype': 'replace',
188 "content": "127.0.0.1",
193 payload
= {'rrsets': [rrset
]}
195 self
.url("/api/v1/servers/localhost/zones/" + data
['id']),
196 data
=json
.dumps(payload
),
197 headers
={'content-type': 'application/json'})
198 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
200 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
201 self
.assertEquals(soa_serial
[-2:], '02')
203 def test_create_zone_with_records(self
):
204 name
= unique_zone_name()
210 "content": "4.3.2.1",
214 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
215 # check our record has appeared
216 self
.assertEquals(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
218 def test_create_zone_with_wildcard_records(self
):
219 name
= unique_zone_name()
225 "content": "4.3.2.1",
229 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
230 # check our record has appeared
231 self
.assertEquals(get_rrset(data
, rrset
['name'], 'A')['records'], rrset
['records'])
233 def test_create_zone_with_comments(self
):
234 name
= unique_zone_name()
238 "type": "soa", # test uppercasing of type, too.
241 "content": "blah blah",
242 "modified_at": 11112,
250 "content": "2001:DB8::1",
254 "account": "test AAAA",
255 "content": "blah blah AAAA",
256 "modified_at": 11112,
264 "content": "\"test TXT\"",
273 "content": "192.0.2.1",
278 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=rrsets
)
279 # NS records have been created
280 self
.assertEquals(len(data
['rrsets']), len(rrsets
) + 1)
281 # check our comment has appeared
282 self
.assertEquals(get_rrset(data
, name
, 'SOA')['comments'], rrsets
[0]['comments'])
283 self
.assertEquals(get_rrset(data
, name
, 'A')['comments'], [])
284 self
.assertEquals(get_rrset(data
, name
, 'TXT')['comments'], [])
285 self
.assertEquals(get_rrset(data
, name
, 'AAAA')['comments'], rrsets
[1]['comments'])
287 def test_create_zone_uncanonical_nameservers(self
):
288 name
= unique_zone_name()
292 'nameservers': ['uncanon.example.com']
295 r
= self
.session
.post(
296 self
.url("/api/v1/servers/localhost/zones"),
297 data
=json
.dumps(payload
),
298 headers
={'content-type': 'application/json'})
299 self
.assertEquals(r
.status_code
, 422)
300 self
.assertIn('Nameserver is not canonical', r
.json()['error'])
302 def test_create_auth_zone_no_name(self
):
303 name
= unique_zone_name()
309 r
= self
.session
.post(
310 self
.url("/api/v1/servers/localhost/zones"),
311 data
=json
.dumps(payload
),
312 headers
={'content-type': 'application/json'})
313 self
.assertEquals(r
.status_code
, 422)
314 self
.assertIn('is not canonical', r
.json()['error'])
316 def test_create_zone_with_custom_soa(self
):
317 name
= unique_zone_name()
318 content
= u
"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
321 "type": "soa", # test uppercasing of type, too.
328 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
], soa_edit_api
='')
329 self
.assertEquals(get_rrset(data
, name
, 'SOA')['records'], rrset
['records'])
330 dbrecs
= get_db_records(name
, 'SOA')
331 self
.assertEqual(dbrecs
[0]['content'], content
.replace('. ', ' '))
333 def test_create_zone_double_dot(self
):
334 name
= 'test..' + unique_zone_name()
338 'nameservers': ['ns1.example.com.']
341 r
= self
.session
.post(
342 self
.url("/api/v1/servers/localhost/zones"),
343 data
=json
.dumps(payload
),
344 headers
={'content-type': 'application/json'})
345 self
.assertEquals(r
.status_code
, 422)
346 self
.assertIn('Unable to parse DNS Name', r
.json()['error'])
348 def test_create_zone_restricted_chars(self
):
349 name
= 'test:' + unique_zone_name() # : isn't good as a name.
353 'nameservers': ['ns1.example.com']
356 r
= self
.session
.post(
357 self
.url("/api/v1/servers/localhost/zones"),
358 data
=json
.dumps(payload
),
359 headers
={'content-type': 'application/json'})
360 self
.assertEquals(r
.status_code
, 422)
361 self
.assertIn('contains unsupported characters', r
.json()['error'])
363 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self
):
364 name
= unique_zone_name()
370 "content": "ns2.example.com.",
377 'nameservers': ['ns1.example.com.'],
381 r
= self
.session
.post(
382 self
.url("/api/v1/servers/localhost/zones"),
383 data
=json
.dumps(payload
),
384 headers
={'content-type': 'application/json'})
385 self
.assertEquals(r
.status_code
, 422)
386 self
.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r
.json()['error'])
388 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self
):
389 name
= unique_zone_name()
391 "name": 'subzone.'+name
,
395 "content": "ns2.example.com.",
402 'nameservers': ['ns1.example.com.'],
406 r
= self
.session
.post(
407 self
.url("/api/v1/servers/localhost/zones"),
408 data
=json
.dumps(payload
),
409 headers
={'content-type': 'application/json'})
410 self
.assert_success_json(r
)
412 def test_create_zone_with_symbols(self
):
413 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
414 name
= payload
['name']
415 expected_id
= name
.replace('/', '=2F')
416 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
417 self
.assertIn(k
, data
)
419 self
.assertEquals(data
[k
], payload
[k
])
420 self
.assertEquals(data
['id'], expected_id
)
421 dbrecs
= get_db_records(name
, 'SOA')
422 self
.assertEqual(dbrecs
[0]['name'], name
.rstrip('.'))
424 def test_create_zone_with_nameservers_non_string(self
):
425 # ensure we don't crash
426 name
= unique_zone_name()
430 'nameservers': [{'a': 'ns1.example.com'}] # invalid
433 r
= self
.session
.post(
434 self
.url("/api/v1/servers/localhost/zones"),
435 data
=json
.dumps(payload
),
436 headers
={'content-type': 'application/json'})
437 self
.assertEquals(r
.status_code
, 422)
439 def test_create_zone_with_dnssec(self
):
441 Create a zone with "dnssec" set and see if a key was made.
443 name
= unique_zone_name()
444 name
, payload
, data
= self
.create_zone(dnssec
=True)
446 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
448 for k
in ('dnssec', ):
449 self
.assertIn(k
, data
)
451 self
.assertEquals(data
[k
], payload
[k
])
453 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
459 self
.assertEquals(r
.status_code
, 200)
460 self
.assertEquals(len(keys
), 1)
461 self
.assertEquals(keys
[0]['type'], 'Cryptokey')
462 self
.assertEquals(keys
[0]['active'], True)
463 self
.assertEquals(keys
[0]['keytype'], 'csk')
465 def test_create_zone_with_dnssec_disable_dnssec(self
):
467 Create a zone with "dnssec", then set "dnssec" to false and see if the
470 name
= unique_zone_name()
471 name
, payload
, data
= self
.create_zone(dnssec
=True)
473 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
474 data
=json
.dumps({'dnssec': False}))
475 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
479 self
.assertEquals(r
.status_code
, 200)
480 self
.assertEquals(zoneinfo
['dnssec'], False)
482 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/cryptokeys'))
486 self
.assertEquals(r
.status_code
, 200)
487 self
.assertEquals(len(keys
), 0)
489 def test_create_zone_with_nsec3param(self
):
491 Create a zone with "nsec3param" set and see if the metadata was added.
493 name
= unique_zone_name()
494 nsec3param
= '1 0 500 aabbccddeeff'
495 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
)
497 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
499 for k
in ('dnssec', 'nsec3param'):
500 self
.assertIn(k
, data
)
502 self
.assertEquals(data
[k
], payload
[k
])
504 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3PARAM'))
510 self
.assertEquals(r
.status_code
, 200)
511 self
.assertEquals(len(data
['metadata']), 1)
512 self
.assertEquals(data
['kind'], 'NSEC3PARAM')
513 self
.assertEquals(data
['metadata'][0], nsec3param
)
515 def test_create_zone_with_nsec3narrow(self
):
517 Create a zone with "nsec3narrow" set and see if the metadata was added.
519 name
= unique_zone_name()
520 nsec3param
= '1 0 500 aabbccddeeff'
521 name
, payload
, data
= self
.create_zone(dnssec
=True, nsec3param
=nsec3param
,
524 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
526 for k
in ('dnssec', 'nsec3param', 'nsec3narrow'):
527 self
.assertIn(k
, data
)
529 self
.assertEquals(data
[k
], payload
[k
])
531 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
+ '/metadata/NSEC3NARROW'))
537 self
.assertEquals(r
.status_code
, 200)
538 self
.assertEquals(len(data
['metadata']), 1)
539 self
.assertEquals(data
['kind'], 'NSEC3NARROW')
540 self
.assertEquals(data
['metadata'][0], '1')
542 def test_create_zone_dnssec_serial(self
):
544 Create a zone set/unset "dnssec" and see if the serial was increased
547 name
= unique_zone_name()
548 name
, payload
, data
= self
.create_zone()
550 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
551 self
.assertEquals(soa_serial
[-2:], '01')
553 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
554 data
=json
.dumps({'dnssec': True}))
555 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
558 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
560 self
.assertEquals(r
.status_code
, 200)
561 self
.assertEquals(soa_serial
[-2:], '02')
563 self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + name
),
564 data
=json
.dumps({'dnssec': False}))
565 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
))
568 soa_serial
= get_first_rec(data
, name
, 'SOA')['content'].split(' ')[2]
570 self
.assertEquals(r
.status_code
, 200)
571 self
.assertEquals(soa_serial
[-2:], '03')
573 def test_zone_absolute_url(self
):
574 name
, payload
, data
= self
.create_zone()
575 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
578 self
.assertTrue(rdata
[0]['url'].startswith('/api/v'))
580 def test_create_zone_metadata(self
):
581 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
582 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
583 data
=json
.dumps(payload_metadata
))
585 self
.assertEquals(r
.status_code
, 201)
586 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
588 def test_create_zone_metadata_kind(self
):
589 payload_metadata
= {"metadata": ["127.0.0.2"]}
590 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
591 data
=json
.dumps(payload_metadata
))
593 self
.assertEquals(r
.status_code
, 200)
594 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
596 def test_create_protected_zone_metadata(self
):
597 # test whether it prevents modification of certain kinds
598 for k
in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
599 payload
= {"metadata": ["FOO", "BAR"]}
600 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k
),
601 data
=json
.dumps(payload
))
602 self
.assertEquals(r
.status_code
, 422)
604 def test_retrieve_zone_metadata(self
):
605 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
606 self
.session
.post(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"),
607 data
=json
.dumps(payload_metadata
))
608 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata"))
610 self
.assertEquals(r
.status_code
, 200)
611 self
.assertIn(payload_metadata
, rdata
)
613 def test_delete_zone_metadata(self
):
614 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
615 self
.assertEquals(r
.status_code
, 200)
616 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
618 self
.assertEquals(r
.status_code
, 200)
619 self
.assertEquals(rdata
["metadata"], [])
621 def test_create_external_zone_metadata(self
):
622 payload_metadata
= {"metadata": ["My very important message"]}
623 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
624 data
=json
.dumps(payload_metadata
))
625 self
.assertEquals(r
.status_code
, 200)
627 self
.assertEquals(rdata
["metadata"], payload_metadata
["metadata"])
629 def test_create_metadata_in_non_existent_zone(self
):
630 payload_metadata
= {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
631 r
= self
.session
.post(self
.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
632 data
=json
.dumps(payload_metadata
))
633 self
.assertEquals(r
.status_code
, 404)
634 # Note: errors should probably contain json (see #5988)
635 # self.assertIn('Could not find domain ', r.json()['error'])
637 def test_create_slave_zone(self
):
638 # Test that nameservers can be absent for slave zones.
639 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
640 for k
in ('name', 'masters', 'kind'):
641 self
.assertIn(k
, data
)
642 self
.assertEquals(data
[k
], payload
[k
])
643 print("payload:", payload
)
645 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
646 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
648 print("zonelist:", zonelist
)
649 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
650 # Also test that fetching the zone works.
651 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
653 print("zone (fetched):", data
)
654 for k
in ('name', 'masters', 'kind'):
655 self
.assertIn(k
, data
)
656 self
.assertEquals(data
[k
], payload
[k
])
657 self
.assertEqual(data
['serial'], 0)
658 self
.assertEqual(data
['rrsets'], [])
660 def test_find_zone_by_name(self
):
661 name
= 'foo/' + unique_zone_name()
662 name
, payload
, data
= self
.create_zone(name
=name
)
663 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones?zone=" + name
))
666 self
.assertEquals(data
[0]['name'], name
)
668 def test_delete_slave_zone(self
):
669 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
670 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + data
['id']))
673 def test_retrieve_slave_zone(self
):
674 name
, payload
, data
= self
.create_zone(kind
='Slave', nameservers
=None, masters
=['127.0.0.2'])
675 print("payload:", payload
)
677 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/axfr-retrieve"))
679 print("status for axfr-retrieve:", data
)
680 self
.assertEqual(data
['result'], u
'Added retrieval request for \'' + payload
['name'] +
681 '\' from master 127.0.0.2')
683 def test_notify_master_zone(self
):
684 name
, payload
, data
= self
.create_zone(kind
='Master')
685 print("payload:", payload
)
687 r
= self
.session
.put(self
.url("/api/v1/servers/localhost/zones/" + data
['id'] + "/notify"))
689 print("status for notify:", data
)
690 self
.assertEqual(data
['result'], 'Notification queued')
692 def test_get_zone_with_symbols(self
):
693 name
, payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name())
694 name
= payload
['name']
695 zone_id
= (name
.replace('/', '=2F'))
696 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
))
698 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
699 self
.assertIn(k
, data
)
701 self
.assertEquals(data
[k
], payload
[k
])
703 def test_get_zone(self
):
704 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
706 example_com
= [domain
for domain
in domains
if domain
['name'] == u
'example.com.'][0]
707 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + example_com
['id']))
708 self
.assert_success_json(r
)
710 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
711 self
.assertIn(k
, data
)
712 self
.assertEquals(data
['name'], 'example.com.')
714 def test_import_zone_broken(self
):
716 'name': 'powerdns-broken.com',
720 payload
['zone'] = """
721 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
722 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
723 ;; WARNING: recursion requested but not available
725 ;; OPT PSEUDOSECTION:
726 ; EDNS: version: 0, flags:; udp: 1680
728 ;powerdns.com. IN SOA
731 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
732 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
733 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
734 powerdns-broken.com. 86400 IN A 82.94.213.34
735 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
736 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
737 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
739 r
= self
.session
.post(
740 self
.url("/api/v1/servers/localhost/zones"),
741 data
=json
.dumps(payload
),
742 headers
={'content-type': 'application/json'})
743 self
.assertEquals(r
.status_code
, 422)
745 def test_import_zone_axfr_outofzone(self
):
746 # Ensure we don't create out-of-zone records
748 'name': unique_zone_name(),
752 payload
['zone'] = """
753 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
754 %NAME% 3600 IN NS powerdnssec2.ds9a.nl.
755 example.org. 3600 IN AAAA 2001:888:2000:1d::2
756 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
757 """.replace('%NAME%', payload
['name'])
758 r
= self
.session
.post(
759 self
.url("/api/v1/servers/localhost/zones"),
760 data
=json
.dumps(payload
),
761 headers
={'content-type': 'application/json'})
762 self
.assertEquals(r
.status_code
, 422)
763 self
.assertEqual(r
.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
765 def test_import_zone_axfr(self
):
767 'name': 'powerdns.com.',
770 'soa_edit_api': '', # turn off so exact SOA comparison works.
772 payload
['zone'] = """
773 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
774 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
775 ;; WARNING: recursion requested but not available
777 ;; OPT PSEUDOSECTION:
778 ; EDNS: version: 0, flags:; udp: 1680
780 ;powerdns.com. IN SOA
783 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
784 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
785 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
786 powerdns.com. 86400 IN A 82.94.213.34
787 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
788 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
789 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
791 r
= self
.session
.post(
792 self
.url("/api/v1/servers/localhost/zones"),
793 data
=json
.dumps(payload
),
794 headers
={'content-type': 'application/json'})
795 self
.assert_success_json(r
)
797 self
.assertIn('name', data
)
801 {'content': 'powerdnssec1.ds9a.nl.'},
802 {'content': 'powerdnssec2.ds9a.nl.'},
805 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
808 {'content': '0 xs.powerdns.com.'},
811 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
814 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
818 eq_zone_rrsets(data
['rrsets'], expected
)
820 # check content in DB is stored WITHOUT trailing dot.
821 dbrecs
= get_db_records(payload
['name'], 'NS')
822 dbrec
= next((dbrec
for dbrec
in dbrecs
if dbrec
['content'].startswith('powerdnssec1')))
823 self
.assertEqual(dbrec
['content'], 'powerdnssec1.ds9a.nl')
825 def test_import_zone_bind(self
):
827 'name': 'example.org.',
830 'soa_edit_api': '', # turn off so exact SOA comparison works.
832 payload
['zone'] = """
833 $TTL 86400 ; 24 hours could have been written as 24h or 1d
834 ; $TTL used for all RRs without explicit TTL value
836 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
843 IN NS ns1.example.org. ; in the domain
844 IN NS ns2.smokeyjoe.com. ; external to domain
845 IN MX 10 mail.another.com. ; external mail provider
846 ; server host definitions
847 ns1 IN A 192.168.0.1 ;name server definition
848 www IN A 192.168.0.2 ;web server definition
849 ftp IN CNAME www.example.org. ;ftp server definition
850 ; non server domain hosts
851 bill IN A 192.168.0.3
852 fred IN A 192.168.0.4
854 r
= self
.session
.post(
855 self
.url("/api/v1/servers/localhost/zones"),
856 data
=json
.dumps(payload
),
857 headers
={'content-type': 'application/json'})
858 self
.assert_success_json(r
)
860 self
.assertIn('name', data
)
864 {'content': 'ns1.example.org.'},
865 {'content': 'ns2.smokeyjoe.com.'},
868 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
871 {'content': '10 mail.another.com.'},
874 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
875 {'content': '192.168.0.2', 'name': 'www.example.org.'},
876 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
877 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
880 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
884 eq_zone_rrsets(data
['rrsets'], expected
)
886 def test_import_zone_bind_cname_apex(self
):
888 'name': unique_zone_name(),
892 payload
['zone'] = """
894 @ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
895 @ IN NS ns1.example.org.
896 @ IN NS ns2.smokeyjoe.com.
897 @ IN CNAME www.example.org.
898 """.replace('%NAME%', payload
['name'])
899 r
= self
.session
.post(
900 self
.url("/api/v1/servers/localhost/zones"),
901 data
=json
.dumps(payload
),
902 headers
={'content-type': 'application/json'})
903 self
.assertEquals(r
.status_code
, 422)
904 self
.assertIn('Conflicts with another RRset', r
.json()['error'])
906 def test_export_zone_json(self
):
907 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
909 r
= self
.session
.get(
910 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
911 headers
={'accept': 'application/json;q=0.9,*/*;q=0.8'}
913 self
.assert_success_json(r
)
915 self
.assertIn('zone', data
)
916 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
917 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
918 name
+ '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name
+
919 ' 0 10800 3600 604800 3600']
920 self
.assertEquals(data
['zone'].strip().split('\n'), expected_data
)
922 def test_export_zone_text(self
):
923 name
, payload
, zone
= self
.create_zone(nameservers
=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api
='')
925 r
= self
.session
.get(
926 self
.url("/api/v1/servers/localhost/zones/" + name
+ "/export"),
927 headers
={'accept': '*/*'}
929 data
= r
.text
.strip().split("\n")
930 expected_data
= [name
+ '\t3600\tIN\tNS\tns1.foo.com.',
931 name
+ '\t3600\tIN\tNS\tns2.foo.com.',
932 name
+ '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name
+
933 ' 0 10800 3600 604800 3600']
934 self
.assertEquals(data
, expected_data
)
936 def test_update_zone(self
):
937 name
, payload
, zone
= self
.create_zone()
938 name
= payload
['name']
939 # update, set as Master and enable SOA-EDIT-API
942 'masters': ['192.0.2.1', '192.0.2.2'],
943 'soa_edit_api': 'EPOCH',
946 r
= self
.session
.put(
947 self
.url("/api/v1/servers/localhost/zones/" + name
),
948 data
=json
.dumps(payload
),
949 headers
={'content-type': 'application/json'})
950 self
.assert_success(r
)
951 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
952 for k
in payload
.keys():
953 self
.assertIn(k
, data
)
954 self
.assertEquals(data
[k
], payload
[k
])
955 # update, back to Native and empty(off)
961 r
= self
.session
.put(
962 self
.url("/api/v1/servers/localhost/zones/" + name
),
963 data
=json
.dumps(payload
),
964 headers
={'content-type': 'application/json'})
965 self
.assert_success(r
)
966 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
967 for k
in payload
.keys():
968 self
.assertIn(k
, data
)
969 self
.assertEquals(data
[k
], payload
[k
])
971 def test_zone_rr_update(self
):
972 name
, payload
, zone
= self
.create_zone()
973 # do a replace (= update)
975 'changetype': 'replace',
981 "content": "ns1.bar.com.",
985 "content": "ns2-disabled.bar.com.",
990 payload
= {'rrsets': [rrset
]}
991 r
= self
.session
.patch(
992 self
.url("/api/v1/servers/localhost/zones/" + name
),
993 data
=json
.dumps(payload
),
994 headers
={'content-type': 'application/json'})
995 self
.assert_success(r
)
996 # verify that (only) the new record is there
997 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
998 self
.assertEquals(get_rrset(data
, name
, 'NS')['records'], rrset
['records'])
1000 def test_zone_rr_update_mx(self
):
1001 # Important to test with MX records, as they have a priority field, which must end up in the content field.
1002 name
, payload
, zone
= self
.create_zone()
1003 # do a replace (= update)
1005 'changetype': 'replace',
1011 "content": "10 mail.example.org.",
1016 payload
= {'rrsets': [rrset
]}
1017 r
= self
.session
.patch(
1018 self
.url("/api/v1/servers/localhost/zones/" + name
),
1019 data
=json
.dumps(payload
),
1020 headers
={'content-type': 'application/json'})
1021 self
.assert_success(r
)
1022 # verify that (only) the new record is there
1023 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1024 self
.assertEquals(get_rrset(data
, name
, 'MX')['records'], rrset
['records'])
1026 def test_zone_rr_update_invalid_mx(self
):
1027 name
, payload
, zone
= self
.create_zone()
1028 # do a replace (= update)
1030 'changetype': 'replace',
1036 "content": "10 mail@mx.example.org.",
1041 payload
= {'rrsets': [rrset
]}
1042 r
= self
.session
.patch(
1043 self
.url("/api/v1/servers/localhost/zones/" + name
),
1044 data
=json
.dumps(payload
),
1045 headers
={'content-type': 'application/json'})
1046 self
.assertEquals(r
.status_code
, 422)
1047 self
.assertIn('non-hostname content', r
.json()['error'])
1048 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1049 self
.assertIsNone(get_rrset(data
, name
, 'MX'))
1051 def test_zone_rr_update_opt(self
):
1052 name
, payload
, zone
= self
.create_zone()
1053 # do a replace (= update)
1055 'changetype': 'replace',
1066 payload
= {'rrsets': [rrset
]}
1067 r
= self
.session
.patch(
1068 self
.url("/api/v1/servers/localhost/zones/" + name
),
1069 data
=json
.dumps(payload
),
1070 headers
={'content-type': 'application/json'})
1071 self
.assertEquals(r
.status_code
, 422)
1072 self
.assertIn('OPT: invalid type given', r
.json()['error'])
1074 def test_zone_rr_update_multiple_rrsets(self
):
1075 name
, payload
, zone
= self
.create_zone()
1077 'changetype': 'replace',
1084 "content": "ns9999.example.com.",
1090 'changetype': 'replace',
1096 "content": "10 mx444.example.com.",
1101 payload
= {'rrsets': [rrset1
, rrset2
]}
1102 r
= self
.session
.patch(
1103 self
.url("/api/v1/servers/localhost/zones/" + name
),
1104 data
=json
.dumps(payload
),
1105 headers
={'content-type': 'application/json'})
1106 self
.assert_success(r
)
1107 # verify that all rrsets have been updated
1108 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1109 self
.assertEquals(get_rrset(data
, name
, 'NS')['records'], rrset1
['records'])
1110 self
.assertEquals(get_rrset(data
, name
, 'MX')['records'], rrset2
['records'])
1112 def test_zone_rr_update_duplicate_record(self
):
1113 name
, payload
, zone
= self
.create_zone()
1115 'changetype': 'replace',
1120 {"content": "ns9999.example.com.", "disabled": False},
1121 {"content": "ns9996.example.com.", "disabled": False},
1122 {"content": "ns9987.example.com.", "disabled": False},
1123 {"content": "ns9988.example.com.", "disabled": False},
1124 {"content": "ns9999.example.com.", "disabled": False},
1127 payload
= {'rrsets': [rrset
]}
1128 r
= self
.session
.patch(
1129 self
.url("/api/v1/servers/localhost/zones/" + name
),
1130 data
=json
.dumps(payload
),
1131 headers
={'content-type': 'application/json'})
1132 self
.assertEquals(r
.status_code
, 422)
1133 self
.assertIn('Duplicate record in RRset', r
.json()['error'])
1135 def test_zone_rr_update_duplicate_rrset(self
):
1136 name
, payload
, zone
= self
.create_zone()
1138 'changetype': 'replace',
1144 "content": "ns9999.example.com.",
1150 'changetype': 'replace',
1156 "content": "ns9998.example.com.",
1161 payload
= {'rrsets': [rrset1
, rrset2
]}
1162 r
= self
.session
.patch(
1163 self
.url("/api/v1/servers/localhost/zones/" + name
),
1164 data
=json
.dumps(payload
),
1165 headers
={'content-type': 'application/json'})
1166 self
.assertEquals(r
.status_code
, 422)
1167 self
.assertIn('Duplicate RRset', r
.json()['error'])
1169 def test_zone_rr_delete(self
):
1170 name
, payload
, zone
= self
.create_zone()
1171 # do a delete of all NS records (these are created with the zone)
1173 'changetype': 'delete',
1177 payload
= {'rrsets': [rrset
]}
1178 r
= self
.session
.patch(
1179 self
.url("/api/v1/servers/localhost/zones/" + name
),
1180 data
=json
.dumps(payload
),
1181 headers
={'content-type': 'application/json'})
1182 self
.assert_success(r
)
1183 # verify that the records are gone
1184 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1185 self
.assertIsNone(get_rrset(data
, name
, 'NS'))
1187 def test_zone_disable_reenable(self
):
1188 # This also tests that SOA-EDIT-API works.
1189 name
, payload
, zone
= self
.create_zone(soa_edit_api
='EPOCH')
1190 # disable zone by disabling SOA
1192 'changetype': 'replace',
1198 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1203 payload
= {'rrsets': [rrset
]}
1204 r
= self
.session
.patch(
1205 self
.url("/api/v1/servers/localhost/zones/" + name
),
1206 data
=json
.dumps(payload
),
1207 headers
={'content-type': 'application/json'})
1208 self
.assert_success(r
)
1209 # check SOA serial has been edited
1210 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1211 soa_serial1
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1212 self
.assertNotEquals(soa_serial1
, '1')
1213 # make sure domain is still in zone list (disabled SOA!)
1214 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones"))
1216 self
.assertEquals(len([domain
for domain
in domains
if domain
['name'] == name
]), 1)
1217 # sleep 1sec to ensure the EPOCH value changes for the next request
1219 # verify that modifying it still works
1220 rrset
['records'][0]['disabled'] = False
1221 payload
= {'rrsets': [rrset
]}
1222 r
= self
.session
.patch(
1223 self
.url("/api/v1/servers/localhost/zones/" + name
),
1224 data
=json
.dumps(payload
),
1225 headers
={'content-type': 'application/json'})
1226 self
.assert_success(r
)
1227 # check SOA serial has been edited again
1228 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1229 soa_serial2
= get_first_rec(data
, name
, 'SOA')['content'].split()[2]
1230 self
.assertNotEquals(soa_serial2
, '1')
1231 self
.assertNotEquals(soa_serial2
, soa_serial1
)
1233 def test_zone_rr_update_out_of_zone(self
):
1234 name
, payload
, zone
= self
.create_zone()
1235 # replace with qname mismatch
1237 'changetype': 'replace',
1238 'name': 'not-in-zone.',
1243 "content": "ns1.bar.com.",
1248 payload
= {'rrsets': [rrset
]}
1249 r
= self
.session
.patch(
1250 self
.url("/api/v1/servers/localhost/zones/" + name
),
1251 data
=json
.dumps(payload
),
1252 headers
={'content-type': 'application/json'})
1253 self
.assertEquals(r
.status_code
, 422)
1254 self
.assertIn('out of zone', r
.json()['error'])
1256 def test_zone_rr_update_restricted_chars(self
):
1257 name
, payload
, zone
= self
.create_zone()
1258 # replace with qname mismatch
1260 'changetype': 'replace',
1261 'name': 'test:' + name
,
1266 "content": "ns1.bar.com.",
1271 payload
= {'rrsets': [rrset
]}
1272 r
= self
.session
.patch(
1273 self
.url("/api/v1/servers/localhost/zones/" + name
),
1274 data
=json
.dumps(payload
),
1275 headers
={'content-type': 'application/json'})
1276 self
.assertEquals(r
.status_code
, 422)
1277 self
.assertIn('contains unsupported characters', r
.json()['error'])
1279 def test_rrset_unknown_type(self
):
1280 name
, payload
, zone
= self
.create_zone()
1282 'changetype': 'replace',
1288 "content": "4.3.2.1",
1293 payload
= {'rrsets': [rrset
]}
1294 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1295 headers
={'content-type': 'application/json'})
1296 self
.assertEquals(r
.status_code
, 422)
1297 self
.assertIn('unknown type', r
.json()['error'])
1299 @parameterized.expand([
1302 def test_rrset_exclusive_and_other(self
, qtype
):
1303 name
, payload
, zone
= self
.create_zone()
1305 'changetype': 'replace',
1311 "content": "example.org.",
1316 payload
= {'rrsets': [rrset
]}
1317 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1318 headers
={'content-type': 'application/json'})
1319 self
.assertEquals(r
.status_code
, 422)
1320 self
.assertIn('Conflicts with pre-existing RRset', r
.json()['error'])
1322 @parameterized.expand([
1325 def test_rrset_other_and_exclusive(self
, qtype
):
1326 name
, payload
, zone
= self
.create_zone()
1328 'changetype': 'replace',
1329 'name': 'sub.'+name
,
1334 "content": "example.org.",
1339 payload
= {'rrsets': [rrset
]}
1340 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1341 headers
={'content-type': 'application/json'})
1342 self
.assert_success(r
)
1344 'changetype': 'replace',
1345 'name': 'sub.'+name
,
1350 "content": "1.2.3.4",
1355 payload
= {'rrsets': [rrset
]}
1356 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1357 headers
={'content-type': 'application/json'})
1358 self
.assertEquals(r
.status_code
, 422)
1359 self
.assertIn('Conflicts with pre-existing RRset', r
.json()['error'])
1361 @parameterized.expand([
1362 ('SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
1363 ('CNAME', ['01.example.org.', '02.example.org.']),
1365 def test_rrset_single_qtypes(self
, qtype
, contents
):
1366 name
, payload
, zone
= self
.create_zone()
1368 'changetype': 'replace',
1369 'name': 'sub.'+name
,
1374 "content": contents
[0],
1378 "content": contents
[1],
1383 payload
= {'rrsets': [rrset
]}
1384 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1385 headers
={'content-type': 'application/json'})
1386 self
.assertEquals(r
.status_code
, 422)
1387 self
.assertIn('IN ' + qtype
+ ' has more than one record', r
.json()['error'])
1389 def test_rrset_zone_apex(self
):
1390 name
, payload
, zone
= self
.create_zone()
1392 'changetype': 'replace',
1398 "content": 'ns1.example.org. test@example.org. 10 10800 3600 604800 3600',
1404 'changetype': 'replace',
1410 "content": 'example.com.',
1416 payload
= {'rrsets': [rrset1
, rrset2
]}
1417 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1418 headers
={'content-type': 'application/json'})
1419 self
.assert_success(r
) # user should be able to create DNAME at APEX as per RFC 6672 section 2.3
1421 def test_rrset_ns_dname_exclude(self
):
1422 name
, payload
, zone
= self
.create_zone()
1424 'changetype': 'replace',
1425 'name': 'delegation.'+name
,
1430 "content": "ns.example.org.",
1435 payload
= {'rrsets': [rrset
]}
1436 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1437 headers
={'content-type': 'application/json'})
1438 self
.assert_success(r
)
1440 'changetype': 'replace',
1441 'name': 'delegation.'+name
,
1446 "content": "example.com.",
1451 payload
= {'rrsets': [rrset
]}
1452 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1453 headers
={'content-type': 'application/json'})
1454 self
.assertEquals(r
.status_code
, 422)
1455 self
.assertIn('Cannot have both NS and DNAME except in zone apex', r
.json()['error'])
1457 ## FIXME: Enable this when it's time for it
1458 # def test_rrset_dname_nothing_under(self):
1459 # name, payload, zone = self.create_zone()
1461 # 'changetype': 'replace',
1462 # 'name': 'delegation.'+name,
1467 # "content": "example.com.",
1472 # payload = {'rrsets': [rrset]}
1473 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1474 # headers={'content-type': 'application/json'})
1475 # self.assert_success(r)
1477 # 'changetype': 'replace',
1478 # 'name': 'sub.delegation.'+name,
1483 # "content": "1.2.3.4",
1488 # payload = {'rrsets': [rrset]}
1489 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1490 # headers={'content-type': 'application/json'})
1491 # self.assertEquals(r.status_code, 422)
1492 # self.assertIn('You cannot have record(s) under CNAME/DNAME', r.json()['error'])
1494 def test_create_zone_with_leading_space(self
):
1495 # Actual regression.
1496 name
, payload
, zone
= self
.create_zone()
1498 'changetype': 'replace',
1504 "content": " 4.3.2.1",
1509 payload
= {'rrsets': [rrset
]}
1510 r
= self
.session
.patch(self
.url("/api/v1/servers/localhost/zones/" + name
), data
=json
.dumps(payload
),
1511 headers
={'content-type': 'application/json'})
1512 self
.assertEquals(r
.status_code
, 422)
1513 self
.assertIn('Not in expected format', r
.json()['error'])
1515 def test_zone_rr_delete_out_of_zone(self
):
1516 name
, payload
, zone
= self
.create_zone()
1518 'changetype': 'delete',
1519 'name': 'not-in-zone.',
1522 payload
= {'rrsets': [rrset
]}
1523 r
= self
.session
.patch(
1524 self
.url("/api/v1/servers/localhost/zones/" + name
),
1525 data
=json
.dumps(payload
),
1526 headers
={'content-type': 'application/json'})
1528 self
.assert_success(r
) # succeed so users can fix their wrong, old data
1530 def test_zone_delete(self
):
1531 name
, payload
, zone
= self
.create_zone()
1532 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
1533 self
.assertEquals(r
.status_code
, 204)
1534 self
.assertNotIn('Content-Type', r
.headers
)
1536 def test_zone_comment_create(self
):
1537 name
, payload
, zone
= self
.create_zone()
1539 'changetype': 'replace',
1546 'content': 'blah blah',
1550 'content': 'blah blah bleh',
1554 payload
= {'rrsets': [rrset
]}
1555 r
= self
.session
.patch(
1556 self
.url("/api/v1/servers/localhost/zones/" + name
),
1557 data
=json
.dumps(payload
),
1558 headers
={'content-type': 'application/json'})
1559 self
.assert_success(r
)
1560 # make sure the comments have been set, and that the NS
1561 # records are still present
1562 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1563 serverset
= get_rrset(data
, name
, 'NS')
1565 self
.assertNotEquals(serverset
['records'], [])
1566 self
.assertNotEquals(serverset
['comments'], [])
1567 # verify that modified_at has been set by pdns
1568 self
.assertNotEquals([c
for c
in serverset
['comments']][0]['modified_at'], 0)
1569 # verify that TTL is correct (regression test)
1570 self
.assertEquals(serverset
['ttl'], 3600)
1572 def test_zone_comment_delete(self
):
1573 # Test: Delete ONLY comments.
1574 name
, payload
, zone
= self
.create_zone()
1576 'changetype': 'replace',
1581 payload
= {'rrsets': [rrset
]}
1582 r
= self
.session
.patch(
1583 self
.url("/api/v1/servers/localhost/zones/" + name
),
1584 data
=json
.dumps(payload
),
1585 headers
={'content-type': 'application/json'})
1586 self
.assert_success(r
)
1587 # make sure the NS records are still present
1588 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1589 serverset
= get_rrset(data
, name
, 'NS')
1591 self
.assertNotEquals(serverset
['records'], [])
1592 self
.assertEquals(serverset
['comments'], [])
1594 def test_zone_comment_out_of_range_modified_at(self
):
1595 # Test if comments on an rrset stay intact if the rrset is replaced
1596 name
, payload
, zone
= self
.create_zone()
1598 'changetype': 'replace',
1604 'content': 'oh hi there',
1605 'modified_at': '4294967297'
1609 payload
= {'rrsets': [rrset
]}
1610 r
= self
.session
.patch(
1611 self
.url("/api/v1/servers/localhost/zones/" + name
),
1612 data
=json
.dumps(payload
),
1613 headers
={'content-type': 'application/json'})
1614 self
.assertEquals(r
.status_code
, 422)
1615 self
.assertIn("Value for key 'modified_at' is out of range", r
.json()['error'])
1617 def test_zone_comment_stay_intact(self
):
1618 # Test if comments on an rrset stay intact if the rrset is replaced
1619 name
, payload
, zone
= self
.create_zone()
1622 'changetype': 'replace',
1628 'content': 'oh hi there',
1633 payload
= {'rrsets': [rrset
]}
1634 r
= self
.session
.patch(
1635 self
.url("/api/v1/servers/localhost/zones/" + name
),
1636 data
=json
.dumps(payload
),
1637 headers
={'content-type': 'application/json'})
1638 self
.assert_success(r
)
1639 # replace rrset records
1641 'changetype': 'replace',
1647 "content": "ns1.bar.com.",
1652 payload2
= {'rrsets': [rrset2
]}
1653 r
= self
.session
.patch(
1654 self
.url("/api/v1/servers/localhost/zones/" + name
),
1655 data
=json
.dumps(payload2
),
1656 headers
={'content-type': 'application/json'})
1657 self
.assert_success(r
)
1658 # make sure the comments still exist
1659 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + name
)).json()
1660 serverset
= get_rrset(data
, name
, 'NS')
1662 self
.assertEquals(serverset
['records'], rrset2
['records'])
1663 self
.assertEquals(serverset
['comments'], rrset
['comments'])
1665 def test_zone_auto_ptr_ipv4_create(self
):
1666 revzone
= '4.2.192.in-addr.arpa.'
1667 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1668 name
= unique_zone_name()
1674 "content": "192.2.4.44",
1679 name
, payload
, data
= self
.create_zone(name
=name
, rrsets
=[rrset
])
1680 del rrset
['records'][0]['set-ptr']
1681 self
.assertEquals(get_rrset(data
, name
, 'A')['records'], rrset
['records'])
1682 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1683 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1685 self
.assertEquals(revsets
, [{
1686 u
'name': u
'44.4.2.192.in-addr.arpa.',
1695 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1696 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1698 def test_zone_auto_ptr_ipv4_update(self
):
1699 revzone
= '0.2.192.in-addr.arpa.'
1700 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1701 name
, payload
, zone
= self
.create_zone()
1703 'changetype': 'replace',
1709 "content": '192.2.0.2',
1715 payload
= {'rrsets': [rrset
]}
1716 r
= self
.session
.patch(
1717 self
.url("/api/v1/servers/localhost/zones/" + name
),
1718 data
=json
.dumps(payload
),
1719 headers
={'content-type': 'application/json'})
1720 self
.assert_success(r
)
1721 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1722 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1724 self
.assertEquals(revsets
, [{
1725 u
'name': u
'2.0.2.192.in-addr.arpa.',
1734 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1735 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1737 def test_zone_auto_ptr_ipv6_update(self
):
1739 revzone
= '8.b.d.0.1.0.0.2.ip6.arpa.'
1740 _
, _
, revzonedata
= self
.create_zone(name
=revzone
)
1741 name
, payload
, zone
= self
.create_zone()
1743 'changetype': 'replace',
1749 "content": '2001:DB8::bb:aa',
1755 payload
= {'rrsets': [rrset
]}
1756 r
= self
.session
.patch(
1757 self
.url("/api/v1/servers/localhost/zones/" + name
),
1758 data
=json
.dumps(payload
),
1759 headers
={'content-type': 'application/json'})
1760 self
.assert_success(r
)
1761 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + revzone
)).json()
1762 revsets
= [s
for s
in r
['rrsets'] if s
['type'] == 'PTR']
1764 self
.assertEquals(revsets
, [{
1765 u
'name': u
'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1774 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1775 self
.assertGreater(r
['serial'], revzonedata
['serial'])
1777 def test_search_rr_exact_zone(self
):
1778 name
= unique_zone_name()
1779 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
1780 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.')))
1781 self
.assert_success_json(r
)
1783 self
.assertEquals(r
.json(), [
1784 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
1785 {u
'content': u
'ns1.example.com.',
1786 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1787 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1788 {u
'content': u
'ns2.example.com.',
1789 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1790 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1791 {u
'content': u
'a.misconfigured.powerdns.server. hostmaster.'+name
+' 22 10800 3600 604800 3600',
1792 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1793 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
1796 def test_search_rr_exact_zone_filter_type_zone(self
):
1797 name
= unique_zone_name()
1799 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
1800 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.') + "&object_type=" + data_type
))
1801 self
.assert_success_json(r
)
1803 self
.assertEquals(r
.json(), [
1804 {u
'object_type': u
'zone', u
'name': name
, u
'zone_id': name
},
1807 def test_search_rr_exact_zone_filter_type_record(self
):
1808 name
= unique_zone_name()
1809 data_type
= "record"
1810 self
.create_zone(name
=name
, serial
=22, soa_edit_api
='')
1811 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
.rstrip('.') + "&object_type=" + data_type
))
1812 self
.assert_success_json(r
)
1814 self
.assertEquals(r
.json(), [
1815 {u
'content': u
'ns1.example.com.',
1816 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1817 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1818 {u
'content': u
'ns2.example.com.',
1819 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1820 u
'ttl': 3600, u
'type': u
'NS', u
'name': name
},
1821 {u
'content': u
'a.misconfigured.powerdns.server. hostmaster.'+name
+' 22 10800 3600 604800 3600',
1822 u
'zone_id': name
, u
'zone': name
, u
'object_type': u
'record', u
'disabled': False,
1823 u
'ttl': 3600, u
'type': u
'SOA', u
'name': name
},
1826 def test_search_rr_substring(self
):
1827 name
= unique_zone_name()
1829 self
.create_zone(name
=name
)
1830 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
1831 self
.assert_success_json(r
)
1833 # should return zone, SOA, ns1, ns2
1834 self
.assertEquals(len(r
.json()), 4)
1836 def test_search_rr_case_insensitive(self
):
1837 name
= unique_zone_name()+'testsuffix.'
1838 self
.create_zone(name
=name
)
1839 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1840 self
.assert_success_json(r
)
1842 # should return zone, SOA, ns1, ns2
1843 self
.assertEquals(len(r
.json()), 4)
1845 def test_search_after_rectify_with_ent(self
):
1846 name
= unique_zone_name()
1847 search
= name
.split('.')[0]
1849 "name": 'sub.sub.' + name
,
1853 "content": "4.3.2.1",
1857 self
.create_zone(name
=name
, rrsets
=[rrset
])
1858 pdnsutil_rectify(name
)
1859 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=*%s*" % search
))
1860 self
.assert_success_json(r
)
1862 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1863 self
.assertEquals(len(r
.json()), 5)
1865 def test_default_api_rectify(self
):
1866 name
= unique_zone_name()
1867 search
= name
.split('.')[0]
1870 "name": 'a.' + name
,
1874 "content": "2001:DB8::1",
1879 "name": 'b.' + name
,
1883 "content": "2001:DB8::2",
1888 self
.create_zone(name
=name
, rrsets
=rrsets
, dnssec
=True, nsec3param
='1 0 1 ab')
1889 dbrecs
= get_db_records(name
, 'AAAA')
1890 self
.assertIsNotNone(dbrecs
[0]['ordername'])
1892 def test_override_api_rectify(self
):
1893 name
= unique_zone_name()
1894 search
= name
.split('.')[0]
1897 "name": 'a.' + name
,
1901 "content": "2001:DB8::1",
1906 "name": 'b.' + name
,
1910 "content": "2001:DB8::2",
1915 self
.create_zone(name
=name
, rrsets
=rrsets
, api_rectify
=False, dnssec
=True, nsec3param
='1 0 1 ab')
1916 dbrecs
= get_db_records(name
, 'AAAA')
1917 self
.assertIsNone(dbrecs
[0]['ordername'])
1919 def test_cname_at_ent_place(self
):
1920 name
, payload
, zone
= self
.create_zone(dnssec
=True, api_rectify
=True)
1922 'changetype': 'replace',
1923 'name': 'sub2.sub1.' + name
,
1927 'content': "4.3.2.1",
1931 payload
= {'rrsets': [rrset
]}
1932 r
= self
.session
.patch(
1933 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
1934 data
=json
.dumps(payload
),
1935 headers
={'content-type': 'application/json'})
1936 self
.assertEquals(r
.status_code
, 204)
1938 'changetype': 'replace',
1939 'name': 'sub1.' + name
,
1943 'content': "www.example.org.",
1947 payload
= {'rrsets': [rrset
]}
1948 r
= self
.session
.patch(
1949 self
.url("/api/v1/servers/localhost/zones/" + zone
['id']),
1950 data
=json
.dumps(payload
),
1951 headers
={'content-type': 'application/json'})
1952 self
.assertEquals(r
.status_code
, 204)
1954 def test_rrset_parameter_post_false(self
):
1955 name
= unique_zone_name()
1959 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1961 r
= self
.session
.post(
1962 self
.url("/api/v1/servers/localhost/zones?rrsets=false"),
1963 data
=json
.dumps(payload
),
1964 headers
={'content-type': 'application/json'})
1966 self
.assert_success_json(r
)
1967 self
.assertEquals(r
.status_code
, 201)
1968 self
.assertEquals(r
.json().get('rrsets'), None)
1970 def test_rrset_false_parameter(self
):
1971 name
= unique_zone_name()
1972 self
.create_zone(name
=name
, kind
='Native')
1973 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=false"))
1974 self
.assert_success_json(r
)
1976 self
.assertEquals(r
.json().get('rrsets'), None)
1978 def test_rrset_true_parameter(self
):
1979 name
= unique_zone_name()
1980 self
.create_zone(name
=name
, kind
='Native')
1981 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=true"))
1982 self
.assert_success_json(r
)
1984 self
.assertEquals(len(r
.json().get('rrsets')), 2)
1986 def test_wrong_rrset_parameter(self
):
1987 name
= unique_zone_name()
1988 self
.create_zone(name
=name
, kind
='Native')
1989 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/"+name
+"?rrsets=foobar"))
1990 self
.assertEquals(r
.status_code
, 422)
1991 self
.assertIn("'rrsets' request parameter value 'foobar' is not supported", r
.json()['error'])
1993 def test_put_master_tsig_key_ids_non_existent(self
):
1994 name
= unique_zone_name()
1995 keyname
= unique_zone_name().split('.')[0]
1996 self
.create_zone(name
=name
, kind
='Native')
1998 'master_tsig_key_ids': [keyname
]
2000 r
= self
.session
.put(self
.url('/api/v1/servers/localhost/zones/' + name
),
2001 data
=json
.dumps(payload
),
2002 headers
={'content-type': 'application/json'})
2003 self
.assertEquals(r
.status_code
, 422)
2004 self
.assertIn('A TSIG key with the name', r
.json()['error'])
2006 def test_put_slave_tsig_key_ids_non_existent(self
):
2007 name
= unique_zone_name()
2008 keyname
= unique_zone_name().split('.')[0]
2009 self
.create_zone(name
=name
, kind
='Native')
2011 'slave_tsig_key_ids': [keyname
]
2013 r
= self
.session
.put(self
.url('/api/v1/servers/localhost/zones/' + name
),
2014 data
=json
.dumps(payload
),
2015 headers
={'content-type': 'application/json'})
2016 self
.assertEquals(r
.status_code
, 422)
2017 self
.assertIn('A TSIG key with the name', r
.json()['error'])
2020 @unittest.skipIf(not is_auth(), "Not applicable")
2021 class AuthRootZone(ApiTestCase
, AuthZonesHelperMixin
):
2024 super(AuthRootZone
, self
).setUp()
2025 # zone name is not unique, so delete the zone before each individual test.
2026 self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/=2E"))
2028 def test_create_zone(self
):
2029 name
, payload
, data
= self
.create_zone(name
='.', serial
=22, soa_edit_api
='')
2030 for k
in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
2031 self
.assertIn(k
, data
)
2033 self
.assertEquals(data
[k
], payload
[k
])
2034 # validate generated SOA
2035 rec
= get_first_rec(data
, '.', 'SOA')
2038 "a.misconfigured.powerdns.server. hostmaster. " + str(payload
['serial']) +
2039 " 10800 3600 604800 3600"
2041 # Regression test: verify zone list works
2042 zonelist
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones")).json()
2043 print("zonelist:", zonelist
)
2044 self
.assertIn(payload
['name'], [zone
['name'] for zone
in zonelist
])
2045 # Also test that fetching the zone works.
2046 print("id:", data
['id'])
2047 self
.assertEquals(data
['id'], '=2E')
2048 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + data
['id'])).json()
2049 print("zone (fetched):", data
)
2050 for k
in ('name', 'kind'):
2051 self
.assertIn(k
, data
)
2052 self
.assertEquals(data
[k
], payload
[k
])
2053 self
.assertEqual(data
['rrsets'][0]['name'], '.')
2055 def test_update_zone(self
):
2056 name
, payload
, zone
= self
.create_zone(name
='.')
2058 # update, set as Master and enable SOA-EDIT-API
2061 'masters': ['192.0.2.1', '192.0.2.2'],
2062 'soa_edit_api': 'EPOCH',
2065 r
= self
.session
.put(
2066 self
.url("/api/v1/servers/localhost/zones/" + zone_id
),
2067 data
=json
.dumps(payload
),
2068 headers
={'content-type': 'application/json'})
2069 self
.assert_success(r
)
2070 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
)).json()
2071 for k
in payload
.keys():
2072 self
.assertIn(k
, data
)
2073 self
.assertEquals(data
[k
], payload
[k
])
2074 # update, back to Native and empty(off)
2080 r
= self
.session
.put(
2081 self
.url("/api/v1/servers/localhost/zones/" + zone_id
),
2082 data
=json
.dumps(payload
),
2083 headers
={'content-type': 'application/json'})
2084 self
.assert_success(r
)
2085 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + zone_id
)).json()
2086 for k
in payload
.keys():
2087 self
.assertIn(k
, data
)
2088 self
.assertEquals(data
[k
], payload
[k
])
2091 @unittest.skipIf(not is_recursor(), "Not applicable")
2092 class RecursorZones(ApiTestCase
):
2094 def create_zone(self
, name
=None, kind
=None, rd
=False, servers
=None):
2096 name
= unique_zone_name()
2103 'recursion_desired': rd
2105 r
= self
.session
.post(
2106 self
.url("/api/v1/servers/localhost/zones"),
2107 data
=json
.dumps(payload
),
2108 headers
={'content-type': 'application/json'})
2109 self
.assert_success_json(r
)
2110 return payload
, r
.json()
2112 def test_create_auth_zone(self
):
2113 payload
, data
= self
.create_zone(kind
='Native')
2114 for k
in payload
.keys():
2115 self
.assertEquals(data
[k
], payload
[k
])
2117 def test_create_zone_no_name(self
):
2121 'servers': ['8.8.8.8'],
2122 'recursion_desired': False,
2125 r
= self
.session
.post(
2126 self
.url("/api/v1/servers/localhost/zones"),
2127 data
=json
.dumps(payload
),
2128 headers
={'content-type': 'application/json'})
2129 self
.assertEquals(r
.status_code
, 422)
2130 self
.assertIn('is not canonical', r
.json()['error'])
2132 def test_create_forwarded_zone(self
):
2133 payload
, data
= self
.create_zone(kind
='Forwarded', rd
=False, servers
=['8.8.8.8'])
2134 # return values are normalized
2135 payload
['servers'][0] += ':53'
2136 for k
in payload
.keys():
2137 self
.assertEquals(data
[k
], payload
[k
])
2139 def test_create_forwarded_rd_zone(self
):
2140 payload
, data
= self
.create_zone(name
='google.com.', kind
='Forwarded', rd
=True, servers
=['8.8.8.8'])
2141 # return values are normalized
2142 payload
['servers'][0] += ':53'
2143 for k
in payload
.keys():
2144 self
.assertEquals(data
[k
], payload
[k
])
2146 def test_create_auth_zone_with_symbols(self
):
2147 payload
, data
= self
.create_zone(name
='foo/bar.'+unique_zone_name(), kind
='Native')
2148 expected_id
= (payload
['name'].replace('/', '=2F'))
2149 for k
in payload
.keys():
2150 self
.assertEquals(data
[k
], payload
[k
])
2151 self
.assertEquals(data
['id'], expected_id
)
2153 def test_rename_auth_zone(self
):
2154 payload
, data
= self
.create_zone(kind
='Native')
2155 name
= payload
['name']
2158 'name': 'renamed-'+name
,
2160 'recursion_desired': False
2162 r
= self
.session
.put(
2163 self
.url("/api/v1/servers/localhost/zones/" + name
),
2164 data
=json
.dumps(payload
),
2165 headers
={'content-type': 'application/json'})
2166 self
.assert_success(r
)
2167 data
= self
.session
.get(self
.url("/api/v1/servers/localhost/zones/" + payload
['name'])).json()
2168 for k
in payload
.keys():
2169 self
.assertEquals(data
[k
], payload
[k
])
2171 def test_zone_delete(self
):
2172 payload
, zone
= self
.create_zone(kind
='Native')
2173 name
= payload
['name']
2174 r
= self
.session
.delete(self
.url("/api/v1/servers/localhost/zones/" + name
))
2175 self
.assertEquals(r
.status_code
, 204)
2176 self
.assertNotIn('Content-Type', r
.headers
)
2178 def test_search_rr_exact_zone(self
):
2179 name
= unique_zone_name()
2180 self
.create_zone(name
=name
, kind
='Native')
2181 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=" + name
))
2182 self
.assert_success_json(r
)
2184 self
.assertEquals(r
.json(), [{u
'type': u
'zone', u
'name': name
, u
'zone_id': name
}])
2186 def test_search_rr_substring(self
):
2187 name
= 'search-rr-zone.name.'
2188 self
.create_zone(name
=name
, kind
='Native')
2189 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2190 self
.assert_success_json(r
)
2192 # should return zone, SOA
2193 self
.assertEquals(len(r
.json()), 2)
2195 @unittest.skipIf(not is_auth(), "Not applicable")
2196 class AuthZoneKeys(ApiTestCase
, AuthZonesHelperMixin
):
2198 def test_get_keys(self
):
2199 r
= self
.session
.get(
2200 self
.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2201 self
.assert_success_json(r
)
2203 self
.assertGreater(len(keys
), 0)
2205 key0
= deepcopy(keys
[0])
2209 u
'algorithm': u
'ECDSAP256SHA256',
2212 u
'type': u
'Cryptokey',
2217 self
.assertEquals(key0
, expected
)
2219 keydata
= keys
[0]['dnskey'].split()
2220 self
.assertEqual(len(keydata
), 4)