]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
Merge pull request #6737 from chbruyand/dnsdist-consistent-hashing
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
1 from __future__ import print_function
2 import json
3 import time
4 import unittest
5 from copy import deepcopy
6 from pprint import pprint
7 from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, get_db_records, pdnsutil_rectify
8
9
10 def get_rrset(data, qname, qtype):
11 for rrset in data['rrsets']:
12 if rrset['name'] == qname and rrset['type'] == qtype:
13 return rrset
14 return None
15
16
17 def get_first_rec(data, qname, qtype):
18 rrset = get_rrset(data, qname, qtype)
19 if rrset:
20 return rrset['records'][0]
21 return None
22
23
24 def eq_zone_rrsets(rrsets, expected):
25 data_got = {}
26 data_expected = {}
27 for type_, expected_records in expected.items():
28 type_ = str(type_)
29 data_got[type_] = set()
30 data_expected[type_] = set()
31 uses_name = any(['name' in expected_record for expected_record in expected_records])
32 # minify + convert received data
33 for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
34 print(rrset)
35 for r in rrset['records']:
36 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
37 # minify expected data
38 for r in expected_records:
39 data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
40
41 print("eq_zone_rrsets: got:")
42 pprint(data_got)
43 print("eq_zone_rrsets: expected:")
44 pprint(data_expected)
45
46 assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
47
48
49 class Zones(ApiTestCase):
50
51 def test_list_zones(self):
52 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
53 self.assert_success_json(r)
54 domains = r.json()
55 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
56 self.assertEquals(len(example_com), 1)
57 example_com = example_com[0]
58 print(example_com)
59 required_fields = ['id', 'url', 'name', 'kind']
60 if is_auth():
61 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial', 'account']
62 self.assertNotEquals(example_com['serial'], 0)
63 elif is_recursor():
64 required_fields = required_fields + ['recursion_desired', 'servers']
65 for field in required_fields:
66 self.assertIn(field, example_com)
67
68
69 class AuthZonesHelperMixin(object):
70 def create_zone(self, name=None, **kwargs):
71 if name is None:
72 name = unique_zone_name()
73 payload = {
74 'name': name,
75 'kind': 'Native',
76 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
77 }
78 for k, v in kwargs.items():
79 if v is None:
80 del payload[k]
81 else:
82 payload[k] = v
83 print("sending", payload)
84 r = self.session.post(
85 self.url("/api/v1/servers/localhost/zones"),
86 data=json.dumps(payload),
87 headers={'content-type': 'application/json'})
88 self.assert_success_json(r)
89 self.assertEquals(r.status_code, 201)
90 reply = r.json()
91 print("reply", reply)
92 return name, payload, reply
93
94
95 @unittest.skipIf(not is_auth(), "Not applicable")
96 class AuthZones(ApiTestCase, AuthZonesHelperMixin):
97
98 def test_create_zone(self):
99 # soa_edit_api has a default, override with empty for this test
100 name, payload, data = self.create_zone(serial=22, soa_edit_api='')
101 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
102 self.assertIn(k, data)
103 if k in payload:
104 self.assertEquals(data[k], payload[k])
105 # validate generated SOA
106 expected_soa = "a.misconfigured.powerdns.server. hostmaster." + name + " " + \
107 str(payload['serial']) + " 10800 3600 604800 3600"
108 self.assertEquals(
109 get_first_rec(data, name, 'SOA')['content'],
110 expected_soa
111 )
112 # Because we had confusion about dots, check that the DB is without dots.
113 dbrecs = get_db_records(name, 'SOA')
114 self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
115
116 def test_create_zone_with_soa_edit_api(self):
117 # soa_edit_api wins over serial
118 name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
119 for k in ('soa_edit_api', ):
120 self.assertIn(k, data)
121 if k in payload:
122 self.assertEquals(data[k], payload[k])
123 # generated EPOCH serial surely is > fixed serial we passed in
124 print(data)
125 self.assertGreater(data['serial'], payload['serial'])
126 soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
127 self.assertGreater(soa_serial, payload['serial'])
128 self.assertEquals(soa_serial, data['serial'])
129
130 def test_create_zone_with_account(self):
131 # soa_edit_api wins over serial
132 name, payload, data = self.create_zone(account='anaccount', serial=10)
133 print(data)
134 for k in ('account', ):
135 self.assertIn(k, data)
136 if k in payload:
137 self.assertEquals(data[k], payload[k])
138
139 def test_create_zone_default_soa_edit_api(self):
140 name, payload, data = self.create_zone()
141 print(data)
142 self.assertEquals(data['soa_edit_api'], 'DEFAULT')
143
144 def test_create_zone_exists(self):
145 name, payload, data = self.create_zone()
146 print(data)
147 payload = {
148 'name': name,
149 'kind': 'Native'
150 }
151 print(payload)
152 r = self.session.post(
153 self.url("/api/v1/servers/localhost/zones"),
154 data=json.dumps(payload),
155 headers={'content-type': 'application/json'})
156 self.assertEquals(r.status_code, 409) # Conflict - already exists
157
158 def test_create_zone_with_soa_edit(self):
159 name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
160 print(data)
161 self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT')
162 self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
163 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
164 # These particular settings lead to the first serial set to YYYYMMDD01.
165 self.assertEquals(soa_serial[-2:], '01')
166 rrset = {
167 'changetype': 'replace',
168 'name': name,
169 'type': 'A',
170 'ttl': 3600,
171 'records': [
172 {
173 "content": "127.0.0.1",
174 "disabled": False
175 }
176 ]
177 }
178 payload = {'rrsets': [rrset]}
179 self.session.patch(
180 self.url("/api/v1/servers/localhost/zones/" + data['id']),
181 data=json.dumps(payload),
182 headers={'content-type': 'application/json'})
183 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
184 data = r.json()
185 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
186 self.assertEquals(soa_serial[-2:], '02')
187
188 def test_create_zone_with_records(self):
189 name = unique_zone_name()
190 rrset = {
191 "name": name,
192 "type": "A",
193 "ttl": 3600,
194 "records": [{
195 "content": "4.3.2.1",
196 "disabled": False,
197 }],
198 }
199 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
200 # check our record has appeared
201 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
202
203 def test_create_zone_with_wildcard_records(self):
204 name = unique_zone_name()
205 rrset = {
206 "name": "*."+name,
207 "type": "A",
208 "ttl": 3600,
209 "records": [{
210 "content": "4.3.2.1",
211 "disabled": False,
212 }],
213 }
214 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
215 # check our record has appeared
216 self.assertEquals(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
217
218 def test_create_zone_with_comments(self):
219 name = unique_zone_name()
220 rrsets = [
221 {
222 "name": name,
223 "type": "soa", # test uppercasing of type, too.
224 "comments": [{
225 "account": "test1",
226 "content": "blah blah",
227 "modified_at": 11112,
228 }],
229 },
230 {
231 "name": name,
232 "type": "AAAA",
233 "ttl": 3600,
234 "records": [{
235 "content": "2001:DB8::1",
236 "disabled": False,
237 }],
238 "comments": [{
239 "account": "test AAAA",
240 "content": "blah blah AAAA",
241 "modified_at": 11112,
242 }],
243 },
244 {
245 "name": name,
246 "type": "TXT",
247 "ttl": 3600,
248 "records": [{
249 "content": "\"test TXT\"",
250 "disabled": False,
251 }],
252 },
253 {
254 "name": name,
255 "type": "A",
256 "ttl": 3600,
257 "records": [{
258 "content": "192.0.2.1",
259 "disabled": False,
260 }],
261 },
262 ]
263 name, payload, data = self.create_zone(name=name, rrsets=rrsets)
264 # NS records have been created
265 self.assertEquals(len(data['rrsets']), len(rrsets) + 1)
266 # check our comment has appeared
267 self.assertEquals(get_rrset(data, name, 'SOA')['comments'], rrsets[0]['comments'])
268 self.assertEquals(get_rrset(data, name, 'A')['comments'], [])
269 self.assertEquals(get_rrset(data, name, 'TXT')['comments'], [])
270 self.assertEquals(get_rrset(data, name, 'AAAA')['comments'], rrsets[1]['comments'])
271
272 def test_create_zone_uncanonical_nameservers(self):
273 name = unique_zone_name()
274 payload = {
275 'name': name,
276 'kind': 'Native',
277 'nameservers': ['uncanon.example.com']
278 }
279 print(payload)
280 r = self.session.post(
281 self.url("/api/v1/servers/localhost/zones"),
282 data=json.dumps(payload),
283 headers={'content-type': 'application/json'})
284 self.assertEquals(r.status_code, 422)
285 self.assertIn('Nameserver is not canonical', r.json()['error'])
286
287 def test_create_auth_zone_no_name(self):
288 name = unique_zone_name()
289 payload = {
290 'name': '',
291 'kind': 'Native',
292 }
293 print(payload)
294 r = self.session.post(
295 self.url("/api/v1/servers/localhost/zones"),
296 data=json.dumps(payload),
297 headers={'content-type': 'application/json'})
298 self.assertEquals(r.status_code, 422)
299 self.assertIn('is not canonical', r.json()['error'])
300
301 def test_create_zone_with_custom_soa(self):
302 name = unique_zone_name()
303 content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
304 rrset = {
305 "name": name,
306 "type": "soa", # test uppercasing of type, too.
307 "ttl": 3600,
308 "records": [{
309 "content": content,
310 "disabled": False,
311 }],
312 }
313 name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
314 self.assertEquals(get_rrset(data, name, 'SOA')['records'], rrset['records'])
315 dbrecs = get_db_records(name, 'SOA')
316 self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
317
318 def test_create_zone_double_dot(self):
319 name = 'test..' + unique_zone_name()
320 payload = {
321 'name': name,
322 'kind': 'Native',
323 'nameservers': ['ns1.example.com.']
324 }
325 print(payload)
326 r = self.session.post(
327 self.url("/api/v1/servers/localhost/zones"),
328 data=json.dumps(payload),
329 headers={'content-type': 'application/json'})
330 self.assertEquals(r.status_code, 422)
331 self.assertIn('Unable to parse DNS Name', r.json()['error'])
332
333 def test_create_zone_restricted_chars(self):
334 name = 'test:' + unique_zone_name() # : isn't good as a name.
335 payload = {
336 'name': name,
337 'kind': 'Native',
338 'nameservers': ['ns1.example.com']
339 }
340 print(payload)
341 r = self.session.post(
342 self.url("/api/v1/servers/localhost/zones"),
343 data=json.dumps(payload),
344 headers={'content-type': 'application/json'})
345 self.assertEquals(r.status_code, 422)
346 self.assertIn('contains unsupported characters', r.json()['error'])
347
348 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self):
349 name = unique_zone_name()
350 rrset = {
351 "name": name,
352 "type": "NS",
353 "ttl": 3600,
354 "records": [{
355 "content": "ns2.example.com.",
356 "disabled": False,
357 }],
358 }
359 payload = {
360 'name': name,
361 'kind': 'Native',
362 'nameservers': ['ns1.example.com.'],
363 'rrsets': [rrset],
364 }
365 print(payload)
366 r = self.session.post(
367 self.url("/api/v1/servers/localhost/zones"),
368 data=json.dumps(payload),
369 headers={'content-type': 'application/json'})
370 self.assertEquals(r.status_code, 422)
371 self.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r.json()['error'])
372
373 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self):
374 name = unique_zone_name()
375 rrset = {
376 "name": 'subzone.'+name,
377 "type": "NS",
378 "ttl": 3600,
379 "records": [{
380 "content": "ns2.example.com.",
381 "disabled": False,
382 }],
383 }
384 payload = {
385 'name': name,
386 'kind': 'Native',
387 'nameservers': ['ns1.example.com.'],
388 'rrsets': [rrset],
389 }
390 print(payload)
391 r = self.session.post(
392 self.url("/api/v1/servers/localhost/zones"),
393 data=json.dumps(payload),
394 headers={'content-type': 'application/json'})
395 self.assert_success_json(r)
396
397 def test_create_zone_with_symbols(self):
398 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
399 name = payload['name']
400 expected_id = name.replace('/', '=2F')
401 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
402 self.assertIn(k, data)
403 if k in payload:
404 self.assertEquals(data[k], payload[k])
405 self.assertEquals(data['id'], expected_id)
406 dbrecs = get_db_records(name, 'SOA')
407 self.assertEqual(dbrecs[0]['name'], name.rstrip('.'))
408
409 def test_create_zone_with_nameservers_non_string(self):
410 # ensure we don't crash
411 name = unique_zone_name()
412 payload = {
413 'name': name,
414 'kind': 'Native',
415 'nameservers': [{'a': 'ns1.example.com'}] # invalid
416 }
417 print(payload)
418 r = self.session.post(
419 self.url("/api/v1/servers/localhost/zones"),
420 data=json.dumps(payload),
421 headers={'content-type': 'application/json'})
422 self.assertEquals(r.status_code, 422)
423
424 def test_create_zone_with_dnssec(self):
425 """
426 Create a zone with "dnssec" set and see if a key was made.
427 """
428 name = unique_zone_name()
429 name, payload, data = self.create_zone(dnssec=True)
430
431 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
432
433 for k in ('dnssec', ):
434 self.assertIn(k, data)
435 if k in payload:
436 self.assertEquals(data[k], payload[k])
437
438 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
439
440 keys = r.json()
441
442 print(keys)
443
444 self.assertEquals(r.status_code, 200)
445 self.assertEquals(len(keys), 1)
446 self.assertEquals(keys[0]['type'], 'Cryptokey')
447 self.assertEquals(keys[0]['active'], True)
448 self.assertEquals(keys[0]['keytype'], 'csk')
449
450 def test_create_zone_with_dnssec_disable_dnssec(self):
451 """
452 Create a zone with "dnssec", then set "dnssec" to false and see if the
453 keys are gone
454 """
455 name = unique_zone_name()
456 name, payload, data = self.create_zone(dnssec=True)
457
458 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
459 data=json.dumps({'dnssec': False}))
460 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
461
462 zoneinfo = r.json()
463
464 self.assertEquals(r.status_code, 200)
465 self.assertEquals(zoneinfo['dnssec'], False)
466
467 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
468
469 keys = r.json()
470
471 self.assertEquals(r.status_code, 200)
472 self.assertEquals(len(keys), 0)
473
474 def test_create_zone_with_nsec3param(self):
475 """
476 Create a zone with "nsec3param" set and see if the metadata was added.
477 """
478 name = unique_zone_name()
479 nsec3param = '1 0 500 aabbccddeeff'
480 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
481
482 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
483
484 for k in ('dnssec', 'nsec3param'):
485 self.assertIn(k, data)
486 if k in payload:
487 self.assertEquals(data[k], payload[k])
488
489 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3PARAM'))
490
491 data = r.json()
492
493 print(data)
494
495 self.assertEquals(r.status_code, 200)
496 self.assertEquals(len(data['metadata']), 1)
497 self.assertEquals(data['kind'], 'NSEC3PARAM')
498 self.assertEquals(data['metadata'][0], nsec3param)
499
500 def test_create_zone_with_nsec3narrow(self):
501 """
502 Create a zone with "nsec3narrow" set and see if the metadata was added.
503 """
504 name = unique_zone_name()
505 nsec3param = '1 0 500 aabbccddeeff'
506 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
507 nsec3narrow=True)
508
509 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
510
511 for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
512 self.assertIn(k, data)
513 if k in payload:
514 self.assertEquals(data[k], payload[k])
515
516 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3NARROW'))
517
518 data = r.json()
519
520 print(data)
521
522 self.assertEquals(r.status_code, 200)
523 self.assertEquals(len(data['metadata']), 1)
524 self.assertEquals(data['kind'], 'NSEC3NARROW')
525 self.assertEquals(data['metadata'][0], '1')
526
527 def test_create_zone_dnssec_serial(self):
528 """
529 Create a zone set/unset "dnssec" and see if the serial was increased
530 after every step
531 """
532 name = unique_zone_name()
533 name, payload, data = self.create_zone()
534
535 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
536 self.assertEquals(soa_serial[-2:], '01')
537
538 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
539 data=json.dumps({'dnssec': True}))
540 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
541
542 data = r.json()
543 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
544
545 self.assertEquals(r.status_code, 200)
546 self.assertEquals(soa_serial[-2:], '02')
547
548 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
549 data=json.dumps({'dnssec': False}))
550 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
551
552 data = r.json()
553 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
554
555 self.assertEquals(r.status_code, 200)
556 self.assertEquals(soa_serial[-2:], '03')
557
558 def test_zone_absolute_url(self):
559 name, payload, data = self.create_zone()
560 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
561 rdata = r.json()
562 print(rdata[0])
563 self.assertTrue(rdata[0]['url'].startswith('/api/v'))
564
565 def test_create_zone_metadata(self):
566 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
567 r = self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
568 data=json.dumps(payload_metadata))
569 rdata = r.json()
570 self.assertEquals(r.status_code, 201)
571 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
572
573 def test_create_zone_metadata_kind(self):
574 payload_metadata = {"metadata": ["127.0.0.2"]}
575 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
576 data=json.dumps(payload_metadata))
577 rdata = r.json()
578 self.assertEquals(r.status_code, 200)
579 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
580
581 def test_create_protected_zone_metadata(self):
582 # test whether it prevents modification of certain kinds
583 for k in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
584 payload = {"metadata": ["FOO", "BAR"]}
585 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k),
586 data=json.dumps(payload))
587 self.assertEquals(r.status_code, 422)
588
589 def test_retrieve_zone_metadata(self):
590 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
591 self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
592 data=json.dumps(payload_metadata))
593 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata"))
594 rdata = r.json()
595 self.assertEquals(r.status_code, 200)
596 self.assertIn(payload_metadata, rdata)
597
598 def test_delete_zone_metadata(self):
599 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
600 self.assertEquals(r.status_code, 200)
601 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
602 rdata = r.json()
603 self.assertEquals(r.status_code, 200)
604 self.assertEquals(rdata["metadata"], [])
605
606 def test_create_external_zone_metadata(self):
607 payload_metadata = {"metadata": ["My very important message"]}
608 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
609 data=json.dumps(payload_metadata))
610 self.assertEquals(r.status_code, 200)
611 rdata = r.json()
612 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
613
614 def test_create_metadata_in_non_existent_zone(self):
615 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
616 r = self.session.post(self.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
617 data=json.dumps(payload_metadata))
618 self.assertEquals(r.status_code, 404)
619 # Note: errors should probably contain json (see #5988)
620 # self.assertIn('Could not find domain ', r.json()['error'])
621
622 def test_create_slave_zone(self):
623 # Test that nameservers can be absent for slave zones.
624 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
625 for k in ('name', 'masters', 'kind'):
626 self.assertIn(k, data)
627 self.assertEquals(data[k], payload[k])
628 print("payload:", payload)
629 print("data:", data)
630 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
631 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
632 zonelist = r.json()
633 print("zonelist:", zonelist)
634 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
635 # Also test that fetching the zone works.
636 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
637 data = r.json()
638 print("zone (fetched):", data)
639 for k in ('name', 'masters', 'kind'):
640 self.assertIn(k, data)
641 self.assertEquals(data[k], payload[k])
642 self.assertEqual(data['serial'], 0)
643 self.assertEqual(data['rrsets'], [])
644
645 def test_delete_slave_zone(self):
646 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
647 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
648 r.raise_for_status()
649
650 def test_retrieve_slave_zone(self):
651 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
652 print("payload:", payload)
653 print("data:", data)
654 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
655 data = r.json()
656 print("status for axfr-retrieve:", data)
657 self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
658 '\' from master 127.0.0.2')
659
660 def test_notify_master_zone(self):
661 name, payload, data = self.create_zone(kind='Master')
662 print("payload:", payload)
663 print("data:", data)
664 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
665 data = r.json()
666 print("status for notify:", data)
667 self.assertEqual(data['result'], 'Notification queued')
668
669 def test_get_zone_with_symbols(self):
670 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
671 name = payload['name']
672 zone_id = (name.replace('/', '=2F'))
673 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
674 data = r.json()
675 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
676 self.assertIn(k, data)
677 if k in payload:
678 self.assertEquals(data[k], payload[k])
679
680 def test_get_zone(self):
681 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
682 domains = r.json()
683 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
684 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id']))
685 self.assert_success_json(r)
686 data = r.json()
687 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
688 self.assertIn(k, data)
689 self.assertEquals(data['name'], 'example.com.')
690
691 def test_import_zone_broken(self):
692 payload = {}
693 payload['zone'] = """
694 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
695 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
696 ;; WARNING: recursion requested but not available
697
698 ;; OPT PSEUDOSECTION:
699 ; EDNS: version: 0, flags:; udp: 1680
700 ;; QUESTION SECTION:
701 ;powerdns.com. IN SOA
702
703 ;; ANSWER SECTION:
704 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
705 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
706 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
707 powerdns-broken.com. 86400 IN A 82.94.213.34
708 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
709 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
710 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
711 """
712 payload['name'] = 'powerdns-broken.com.'
713 payload['kind'] = 'Master'
714 payload['nameservers'] = []
715 r = self.session.post(
716 self.url("/api/v1/servers/localhost/zones"),
717 data=json.dumps(payload),
718 headers={'content-type': 'application/json'})
719 self.assertEquals(r.status_code, 422)
720
721 def test_import_zone_axfr_outofzone(self):
722 # Ensure we don't create out-of-zone records
723 name = unique_zone_name()
724 payload = {}
725 payload['zone'] = """
726 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
727 NAME 3600 IN NS powerdnssec2.ds9a.nl.
728 example.org. 3600 IN AAAA 2001:888:2000:1d::2
729 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
730 """.replace('NAME', name)
731 payload['name'] = name
732 payload['kind'] = 'Master'
733 payload['nameservers'] = []
734 r = self.session.post(
735 self.url("/api/v1/servers/localhost/zones"),
736 data=json.dumps(payload),
737 headers={'content-type': 'application/json'})
738 self.assertEquals(r.status_code, 422)
739 self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
740
741 def test_import_zone_axfr(self):
742 payload = {}
743 payload['zone'] = """
744 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
745 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
746 ;; WARNING: recursion requested but not available
747
748 ;; OPT PSEUDOSECTION:
749 ; EDNS: version: 0, flags:; udp: 1680
750 ;; QUESTION SECTION:
751 ;powerdns.com. IN SOA
752
753 ;; ANSWER SECTION:
754 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
755 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
756 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
757 powerdns.com. 86400 IN A 82.94.213.34
758 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
759 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
760 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
761 """
762 payload['name'] = 'powerdns.com.'
763 payload['kind'] = 'Master'
764 payload['nameservers'] = []
765 payload['soa_edit_api'] = '' # turn off so exact SOA comparison works.
766 r = self.session.post(
767 self.url("/api/v1/servers/localhost/zones"),
768 data=json.dumps(payload),
769 headers={'content-type': 'application/json'})
770 self.assert_success_json(r)
771 data = r.json()
772 self.assertIn('name', data)
773
774 expected = {
775 'NS': [
776 {'content': 'powerdnssec1.ds9a.nl.'},
777 {'content': 'powerdnssec2.ds9a.nl.'},
778 ],
779 'SOA': [
780 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
781 ],
782 'MX': [
783 {'content': '0 xs.powerdns.com.'},
784 ],
785 'A': [
786 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
787 ],
788 'AAAA': [
789 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
790 ],
791 }
792
793 eq_zone_rrsets(data['rrsets'], expected)
794
795 # check content in DB is stored WITHOUT trailing dot.
796 dbrecs = get_db_records(payload['name'], 'NS')
797 dbrec = next((dbrec for dbrec in dbrecs if dbrec['content'].startswith('powerdnssec1')))
798 self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
799
800 def test_import_zone_bind(self):
801 payload = {}
802 payload['zone'] = """
803 $TTL 86400 ; 24 hours could have been written as 24h or 1d
804 ; $TTL used for all RRs without explicit TTL value
805 $ORIGIN example.org.
806 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
807 2002022401 ; serial
808 3H ; refresh
809 15 ; retry
810 1w ; expire
811 3h ; minimum
812 )
813 IN NS ns1.example.org. ; in the domain
814 IN NS ns2.smokeyjoe.com. ; external to domain
815 IN MX 10 mail.another.com. ; external mail provider
816 ; server host definitions
817 ns1 IN A 192.168.0.1 ;name server definition
818 www IN A 192.168.0.2 ;web server definition
819 ftp IN CNAME www.example.org. ;ftp server definition
820 ; non server domain hosts
821 bill IN A 192.168.0.3
822 fred IN A 192.168.0.4
823 """
824 payload['name'] = 'example.org.'
825 payload['kind'] = 'Master'
826 payload['nameservers'] = []
827 payload['soa_edit_api'] = '' # turn off so exact SOA comparison works.
828 r = self.session.post(
829 self.url("/api/v1/servers/localhost/zones"),
830 data=json.dumps(payload),
831 headers={'content-type': 'application/json'})
832 self.assert_success_json(r)
833 data = r.json()
834 self.assertIn('name', data)
835
836 expected = {
837 'NS': [
838 {'content': 'ns1.example.org.'},
839 {'content': 'ns2.smokeyjoe.com.'},
840 ],
841 'SOA': [
842 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
843 ],
844 'MX': [
845 {'content': '10 mail.another.com.'},
846 ],
847 'A': [
848 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
849 {'content': '192.168.0.2', 'name': 'www.example.org.'},
850 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
851 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
852 ],
853 'CNAME': [
854 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
855 ],
856 }
857
858 eq_zone_rrsets(data['rrsets'], expected)
859
860 def test_export_zone_json(self):
861 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
862 # export it
863 r = self.session.get(
864 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
865 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
866 )
867 self.assert_success_json(r)
868 data = r.json()
869 self.assertIn('zone', data)
870 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
871 name + '\t3600\tIN\tNS\tns2.foo.com.',
872 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
873 ' 0 10800 3600 604800 3600']
874 self.assertEquals(data['zone'].strip().split('\n'), expected_data)
875
876 def test_export_zone_text(self):
877 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
878 # export it
879 r = self.session.get(
880 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
881 headers={'accept': '*/*'}
882 )
883 data = r.text.strip().split("\n")
884 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
885 name + '\t3600\tIN\tNS\tns2.foo.com.',
886 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
887 ' 0 10800 3600 604800 3600']
888 self.assertEquals(data, expected_data)
889
890 def test_update_zone(self):
891 name, payload, zone = self.create_zone()
892 name = payload['name']
893 # update, set as Master and enable SOA-EDIT-API
894 payload = {
895 'kind': 'Master',
896 'masters': ['192.0.2.1', '192.0.2.2'],
897 'soa_edit_api': 'EPOCH',
898 'soa_edit': 'EPOCH'
899 }
900 r = self.session.put(
901 self.url("/api/v1/servers/localhost/zones/" + name),
902 data=json.dumps(payload),
903 headers={'content-type': 'application/json'})
904 self.assert_success(r)
905 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
906 for k in payload.keys():
907 self.assertIn(k, data)
908 self.assertEquals(data[k], payload[k])
909 # update, back to Native and empty(off)
910 payload = {
911 'kind': 'Native',
912 'soa_edit_api': '',
913 'soa_edit': ''
914 }
915 r = self.session.put(
916 self.url("/api/v1/servers/localhost/zones/" + name),
917 data=json.dumps(payload),
918 headers={'content-type': 'application/json'})
919 self.assert_success(r)
920 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
921 for k in payload.keys():
922 self.assertIn(k, data)
923 self.assertEquals(data[k], payload[k])
924
925 def test_zone_rr_update(self):
926 name, payload, zone = self.create_zone()
927 # do a replace (= update)
928 rrset = {
929 'changetype': 'replace',
930 'name': name,
931 'type': 'ns',
932 'ttl': 3600,
933 'records': [
934 {
935 "content": "ns1.bar.com.",
936 "disabled": False
937 },
938 {
939 "content": "ns2-disabled.bar.com.",
940 "disabled": True
941 }
942 ]
943 }
944 payload = {'rrsets': [rrset]}
945 r = self.session.patch(
946 self.url("/api/v1/servers/localhost/zones/" + name),
947 data=json.dumps(payload),
948 headers={'content-type': 'application/json'})
949 self.assert_success(r)
950 # verify that (only) the new record is there
951 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
952 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset['records'])
953
954 def test_zone_rr_update_mx(self):
955 # Important to test with MX records, as they have a priority field, which must end up in the content field.
956 name, payload, zone = self.create_zone()
957 # do a replace (= update)
958 rrset = {
959 'changetype': 'replace',
960 'name': name,
961 'type': 'MX',
962 'ttl': 3600,
963 'records': [
964 {
965 "content": "10 mail.example.org.",
966 "disabled": False
967 }
968 ]
969 }
970 payload = {'rrsets': [rrset]}
971 r = self.session.patch(
972 self.url("/api/v1/servers/localhost/zones/" + name),
973 data=json.dumps(payload),
974 headers={'content-type': 'application/json'})
975 self.assert_success(r)
976 # verify that (only) the new record is there
977 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
978 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset['records'])
979
980 def test_zone_rr_update_opt(self):
981 name, payload, zone = self.create_zone()
982 # do a replace (= update)
983 rrset = {
984 'changetype': 'replace',
985 'name': name,
986 'type': 'OPT',
987 'ttl': 3600,
988 'records': [
989 {
990 "content": "9",
991 "disabled": False
992 }
993 ]
994 }
995 payload = {'rrsets': [rrset]}
996 r = self.session.patch(
997 self.url("/api/v1/servers/localhost/zones/" + name),
998 data=json.dumps(payload),
999 headers={'content-type': 'application/json'})
1000 self.assertEquals(r.status_code, 422)
1001 self.assertIn('OPT: invalid type given', r.json()['error'])
1002
1003 def test_zone_rr_update_multiple_rrsets(self):
1004 name, payload, zone = self.create_zone()
1005 rrset1 = {
1006 'changetype': 'replace',
1007 'name': name,
1008 'type': 'NS',
1009 'ttl': 3600,
1010 'records': [
1011 {
1012
1013 "content": "ns9999.example.com.",
1014 "disabled": False
1015 }
1016 ]
1017 }
1018 rrset2 = {
1019 'changetype': 'replace',
1020 'name': name,
1021 'type': 'MX',
1022 'ttl': 3600,
1023 'records': [
1024 {
1025 "content": "10 mx444.example.com.",
1026 "disabled": False
1027 }
1028 ]
1029 }
1030 payload = {'rrsets': [rrset1, rrset2]}
1031 r = self.session.patch(
1032 self.url("/api/v1/servers/localhost/zones/" + name),
1033 data=json.dumps(payload),
1034 headers={'content-type': 'application/json'})
1035 self.assert_success(r)
1036 # verify that all rrsets have been updated
1037 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1038 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset1['records'])
1039 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset2['records'])
1040
1041 def test_zone_rr_update_duplicate_record(self):
1042 name, payload, zone = self.create_zone()
1043 rrset = {
1044 'changetype': 'replace',
1045 'name': name,
1046 'type': 'NS',
1047 'ttl': 3600,
1048 'records': [
1049 {"content": "ns9999.example.com.", "disabled": False},
1050 {"content": "ns9996.example.com.", "disabled": False},
1051 {"content": "ns9987.example.com.", "disabled": False},
1052 {"content": "ns9988.example.com.", "disabled": False},
1053 {"content": "ns9999.example.com.", "disabled": False},
1054 ]
1055 }
1056 payload = {'rrsets': [rrset]}
1057 r = self.session.patch(
1058 self.url("/api/v1/servers/localhost/zones/" + name),
1059 data=json.dumps(payload),
1060 headers={'content-type': 'application/json'})
1061 self.assertEquals(r.status_code, 422)
1062 self.assertIn('Duplicate record in RRset', r.json()['error'])
1063
1064 def test_zone_rr_update_duplicate_rrset(self):
1065 name, payload, zone = self.create_zone()
1066 rrset1 = {
1067 'changetype': 'replace',
1068 'name': name,
1069 'type': 'NS',
1070 'ttl': 3600,
1071 'records': [
1072 {
1073 "content": "ns9999.example.com.",
1074 "disabled": False
1075 }
1076 ]
1077 }
1078 rrset2 = {
1079 'changetype': 'replace',
1080 'name': name,
1081 'type': 'NS',
1082 'ttl': 3600,
1083 'records': [
1084 {
1085 "content": "ns9998.example.com.",
1086 "disabled": False
1087 }
1088 ]
1089 }
1090 payload = {'rrsets': [rrset1, rrset2]}
1091 r = self.session.patch(
1092 self.url("/api/v1/servers/localhost/zones/" + name),
1093 data=json.dumps(payload),
1094 headers={'content-type': 'application/json'})
1095 self.assertEquals(r.status_code, 422)
1096 self.assertIn('Duplicate RRset', r.json()['error'])
1097
1098 def test_zone_rr_delete(self):
1099 name, payload, zone = self.create_zone()
1100 # do a delete of all NS records (these are created with the zone)
1101 rrset = {
1102 'changetype': 'delete',
1103 'name': name,
1104 'type': 'NS'
1105 }
1106 payload = {'rrsets': [rrset]}
1107 r = self.session.patch(
1108 self.url("/api/v1/servers/localhost/zones/" + name),
1109 data=json.dumps(payload),
1110 headers={'content-type': 'application/json'})
1111 self.assert_success(r)
1112 # verify that the records are gone
1113 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1114 self.assertIsNone(get_rrset(data, name, 'NS'))
1115
1116 def test_zone_disable_reenable(self):
1117 # This also tests that SOA-EDIT-API works.
1118 name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
1119 # disable zone by disabling SOA
1120 rrset = {
1121 'changetype': 'replace',
1122 'name': name,
1123 'type': 'SOA',
1124 'ttl': 3600,
1125 'records': [
1126 {
1127 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1128 "disabled": True
1129 }
1130 ]
1131 }
1132 payload = {'rrsets': [rrset]}
1133 r = self.session.patch(
1134 self.url("/api/v1/servers/localhost/zones/" + name),
1135 data=json.dumps(payload),
1136 headers={'content-type': 'application/json'})
1137 self.assert_success(r)
1138 # check SOA serial has been edited
1139 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1140 soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1141 self.assertNotEquals(soa_serial1, '1')
1142 # make sure domain is still in zone list (disabled SOA!)
1143 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
1144 domains = r.json()
1145 self.assertEquals(len([domain for domain in domains if domain['name'] == name]), 1)
1146 # sleep 1sec to ensure the EPOCH value changes for the next request
1147 time.sleep(1)
1148 # verify that modifying it still works
1149 rrset['records'][0]['disabled'] = False
1150 payload = {'rrsets': [rrset]}
1151 r = self.session.patch(
1152 self.url("/api/v1/servers/localhost/zones/" + name),
1153 data=json.dumps(payload),
1154 headers={'content-type': 'application/json'})
1155 self.assert_success(r)
1156 # check SOA serial has been edited again
1157 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1158 soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1159 self.assertNotEquals(soa_serial2, '1')
1160 self.assertNotEquals(soa_serial2, soa_serial1)
1161
1162 def test_zone_rr_update_out_of_zone(self):
1163 name, payload, zone = self.create_zone()
1164 # replace with qname mismatch
1165 rrset = {
1166 'changetype': 'replace',
1167 'name': 'not-in-zone.',
1168 'type': 'NS',
1169 'ttl': 3600,
1170 'records': [
1171 {
1172 "content": "ns1.bar.com.",
1173 "disabled": False
1174 }
1175 ]
1176 }
1177 payload = {'rrsets': [rrset]}
1178 r = self.session.patch(
1179 self.url("/api/v1/servers/localhost/zones/" + name),
1180 data=json.dumps(payload),
1181 headers={'content-type': 'application/json'})
1182 self.assertEquals(r.status_code, 422)
1183 self.assertIn('out of zone', r.json()['error'])
1184
1185 def test_zone_rr_update_restricted_chars(self):
1186 name, payload, zone = self.create_zone()
1187 # replace with qname mismatch
1188 rrset = {
1189 'changetype': 'replace',
1190 'name': 'test:' + name,
1191 'type': 'NS',
1192 'ttl': 3600,
1193 'records': [
1194 {
1195 "content": "ns1.bar.com.",
1196 "disabled": False
1197 }
1198 ]
1199 }
1200 payload = {'rrsets': [rrset]}
1201 r = self.session.patch(
1202 self.url("/api/v1/servers/localhost/zones/" + name),
1203 data=json.dumps(payload),
1204 headers={'content-type': 'application/json'})
1205 self.assertEquals(r.status_code, 422)
1206 self.assertIn('contains unsupported characters', r.json()['error'])
1207
1208 def test_rrset_unknown_type(self):
1209 name, payload, zone = self.create_zone()
1210 rrset = {
1211 'changetype': 'replace',
1212 'name': name,
1213 'type': 'FAFAFA',
1214 'ttl': 3600,
1215 'records': [
1216 {
1217 "content": "4.3.2.1",
1218 "disabled": False
1219 }
1220 ]
1221 }
1222 payload = {'rrsets': [rrset]}
1223 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1224 headers={'content-type': 'application/json'})
1225 self.assertEquals(r.status_code, 422)
1226 self.assertIn('unknown type', r.json()['error'])
1227
1228 def test_rrset_cname_and_other(self):
1229 name, payload, zone = self.create_zone()
1230 rrset = {
1231 'changetype': 'replace',
1232 'name': name,
1233 'type': 'CNAME',
1234 'ttl': 3600,
1235 'records': [
1236 {
1237 "content": "example.org.",
1238 "disabled": False
1239 }
1240 ]
1241 }
1242 payload = {'rrsets': [rrset]}
1243 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1244 headers={'content-type': 'application/json'})
1245 self.assertEquals(r.status_code, 422)
1246 self.assertIn('Conflicts with pre-existing non-CNAME RRset', r.json()['error'])
1247
1248 def test_rrset_other_and_cname(self):
1249 name, payload, zone = self.create_zone()
1250 rrset = {
1251 'changetype': 'replace',
1252 'name': 'sub.'+name,
1253 'type': 'CNAME',
1254 'ttl': 3600,
1255 'records': [
1256 {
1257 "content": "example.org.",
1258 "disabled": False
1259 }
1260 ]
1261 }
1262 payload = {'rrsets': [rrset]}
1263 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1264 headers={'content-type': 'application/json'})
1265 self.assert_success(r)
1266 rrset = {
1267 'changetype': 'replace',
1268 'name': 'sub.'+name,
1269 'type': 'A',
1270 'ttl': 3600,
1271 'records': [
1272 {
1273 "content": "1.2.3.4",
1274 "disabled": False
1275 }
1276 ]
1277 }
1278 payload = {'rrsets': [rrset]}
1279 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1280 headers={'content-type': 'application/json'})
1281 self.assertEquals(r.status_code, 422)
1282 self.assertIn('Conflicts with pre-existing CNAME RRset', r.json()['error'])
1283
1284 def test_create_zone_with_leading_space(self):
1285 # Actual regression.
1286 name, payload, zone = self.create_zone()
1287 rrset = {
1288 'changetype': 'replace',
1289 'name': name,
1290 'type': 'A',
1291 'ttl': 3600,
1292 'records': [
1293 {
1294 "content": " 4.3.2.1",
1295 "disabled": False
1296 }
1297 ]
1298 }
1299 payload = {'rrsets': [rrset]}
1300 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1301 headers={'content-type': 'application/json'})
1302 self.assertEquals(r.status_code, 422)
1303 self.assertIn('Not in expected format', r.json()['error'])
1304
1305 def test_zone_rr_delete_out_of_zone(self):
1306 name, payload, zone = self.create_zone()
1307 rrset = {
1308 'changetype': 'delete',
1309 'name': 'not-in-zone.',
1310 'type': 'NS'
1311 }
1312 payload = {'rrsets': [rrset]}
1313 r = self.session.patch(
1314 self.url("/api/v1/servers/localhost/zones/" + name),
1315 data=json.dumps(payload),
1316 headers={'content-type': 'application/json'})
1317 print(r.content)
1318 self.assert_success(r) # succeed so users can fix their wrong, old data
1319
1320 def test_zone_delete(self):
1321 name, payload, zone = self.create_zone()
1322 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1323 self.assertEquals(r.status_code, 204)
1324 self.assertNotIn('Content-Type', r.headers)
1325
1326 def test_zone_comment_create(self):
1327 name, payload, zone = self.create_zone()
1328 rrset = {
1329 'changetype': 'replace',
1330 'name': name,
1331 'type': 'NS',
1332 'ttl': 3600,
1333 'comments': [
1334 {
1335 'account': 'test1',
1336 'content': 'blah blah',
1337 },
1338 {
1339 'account': 'test2',
1340 'content': 'blah blah bleh',
1341 }
1342 ]
1343 }
1344 payload = {'rrsets': [rrset]}
1345 r = self.session.patch(
1346 self.url("/api/v1/servers/localhost/zones/" + name),
1347 data=json.dumps(payload),
1348 headers={'content-type': 'application/json'})
1349 self.assert_success(r)
1350 # make sure the comments have been set, and that the NS
1351 # records are still present
1352 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1353 serverset = get_rrset(data, name, 'NS')
1354 print(serverset)
1355 self.assertNotEquals(serverset['records'], [])
1356 self.assertNotEquals(serverset['comments'], [])
1357 # verify that modified_at has been set by pdns
1358 self.assertNotEquals([c for c in serverset['comments']][0]['modified_at'], 0)
1359 # verify that TTL is correct (regression test)
1360 self.assertEquals(serverset['ttl'], 3600)
1361
1362 def test_zone_comment_delete(self):
1363 # Test: Delete ONLY comments.
1364 name, payload, zone = self.create_zone()
1365 rrset = {
1366 'changetype': 'replace',
1367 'name': name,
1368 'type': 'NS',
1369 'comments': []
1370 }
1371 payload = {'rrsets': [rrset]}
1372 r = self.session.patch(
1373 self.url("/api/v1/servers/localhost/zones/" + name),
1374 data=json.dumps(payload),
1375 headers={'content-type': 'application/json'})
1376 self.assert_success(r)
1377 # make sure the NS records are still present
1378 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1379 serverset = get_rrset(data, name, 'NS')
1380 print(serverset)
1381 self.assertNotEquals(serverset['records'], [])
1382 self.assertEquals(serverset['comments'], [])
1383
1384 def test_zone_comment_stay_intact(self):
1385 # Test if comments on an rrset stay intact if the rrset is replaced
1386 name, payload, zone = self.create_zone()
1387 # create a comment
1388 rrset = {
1389 'changetype': 'replace',
1390 'name': name,
1391 'type': 'NS',
1392 'comments': [
1393 {
1394 'account': 'test1',
1395 'content': 'oh hi there',
1396 'modified_at': 1111
1397 }
1398 ]
1399 }
1400 payload = {'rrsets': [rrset]}
1401 r = self.session.patch(
1402 self.url("/api/v1/servers/localhost/zones/" + name),
1403 data=json.dumps(payload),
1404 headers={'content-type': 'application/json'})
1405 self.assert_success(r)
1406 # replace rrset records
1407 rrset2 = {
1408 'changetype': 'replace',
1409 'name': name,
1410 'type': 'NS',
1411 'ttl': 3600,
1412 'records': [
1413 {
1414 "content": "ns1.bar.com.",
1415 "disabled": False
1416 }
1417 ]
1418 }
1419 payload2 = {'rrsets': [rrset2]}
1420 r = self.session.patch(
1421 self.url("/api/v1/servers/localhost/zones/" + name),
1422 data=json.dumps(payload2),
1423 headers={'content-type': 'application/json'})
1424 self.assert_success(r)
1425 # make sure the comments still exist
1426 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1427 serverset = get_rrset(data, name, 'NS')
1428 print(serverset)
1429 self.assertEquals(serverset['records'], rrset2['records'])
1430 self.assertEquals(serverset['comments'], rrset['comments'])
1431
1432 def test_zone_auto_ptr_ipv4_create(self):
1433 revzone = '4.2.192.in-addr.arpa.'
1434 _, _, revzonedata = self.create_zone(name=revzone)
1435 name = unique_zone_name()
1436 rrset = {
1437 "name": name,
1438 "type": "A",
1439 "ttl": 3600,
1440 "records": [{
1441 "content": "192.2.4.44",
1442 "disabled": False,
1443 "set-ptr": True,
1444 }],
1445 }
1446 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
1447 del rrset['records'][0]['set-ptr']
1448 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
1449 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1450 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1451 print(revsets)
1452 self.assertEquals(revsets, [{
1453 u'name': u'44.4.2.192.in-addr.arpa.',
1454 u'ttl': 3600,
1455 u'type': u'PTR',
1456 u'comments': [],
1457 u'records': [{
1458 u'content': name,
1459 u'disabled': False,
1460 }],
1461 }])
1462 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1463 self.assertGreater(r['serial'], revzonedata['serial'])
1464
1465 def test_zone_auto_ptr_ipv4_update(self):
1466 revzone = '0.2.192.in-addr.arpa.'
1467 _, _, revzonedata = self.create_zone(name=revzone)
1468 name, payload, zone = self.create_zone()
1469 rrset = {
1470 'changetype': 'replace',
1471 'name': name,
1472 'type': 'A',
1473 'ttl': 3600,
1474 'records': [
1475 {
1476 "content": '192.2.0.2',
1477 "disabled": False,
1478 "set-ptr": True
1479 }
1480 ]
1481 }
1482 payload = {'rrsets': [rrset]}
1483 r = self.session.patch(
1484 self.url("/api/v1/servers/localhost/zones/" + name),
1485 data=json.dumps(payload),
1486 headers={'content-type': 'application/json'})
1487 self.assert_success(r)
1488 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1489 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1490 print(revsets)
1491 self.assertEquals(revsets, [{
1492 u'name': u'2.0.2.192.in-addr.arpa.',
1493 u'ttl': 3600,
1494 u'type': u'PTR',
1495 u'comments': [],
1496 u'records': [{
1497 u'content': name,
1498 u'disabled': False,
1499 }],
1500 }])
1501 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1502 self.assertGreater(r['serial'], revzonedata['serial'])
1503
1504 def test_zone_auto_ptr_ipv6_update(self):
1505 # 2001:DB8::bb:aa
1506 revzone = '8.b.d.0.1.0.0.2.ip6.arpa.'
1507 _, _, revzonedata = self.create_zone(name=revzone)
1508 name, payload, zone = self.create_zone()
1509 rrset = {
1510 'changetype': 'replace',
1511 'name': name,
1512 'type': 'AAAA',
1513 'ttl': 3600,
1514 'records': [
1515 {
1516 "content": '2001:DB8::bb:aa',
1517 "disabled": False,
1518 "set-ptr": True
1519 }
1520 ]
1521 }
1522 payload = {'rrsets': [rrset]}
1523 r = self.session.patch(
1524 self.url("/api/v1/servers/localhost/zones/" + name),
1525 data=json.dumps(payload),
1526 headers={'content-type': 'application/json'})
1527 self.assert_success(r)
1528 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1529 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1530 print(revsets)
1531 self.assertEquals(revsets, [{
1532 u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1533 u'ttl': 3600,
1534 u'type': u'PTR',
1535 u'comments': [],
1536 u'records': [{
1537 u'content': name,
1538 u'disabled': False,
1539 }],
1540 }])
1541 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1542 self.assertGreater(r['serial'], revzonedata['serial'])
1543
1544 def test_search_rr_exact_zone(self):
1545 name = unique_zone_name()
1546 self.create_zone(name=name, serial=22, soa_edit_api='')
1547 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
1548 self.assert_success_json(r)
1549 print(r.json())
1550 self.assertEquals(r.json(), [
1551 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1552 {u'content': u'ns1.example.com.',
1553 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1554 u'ttl': 3600, u'type': u'NS', u'name': name},
1555 {u'content': u'ns2.example.com.',
1556 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1557 u'ttl': 3600, u'type': u'NS', u'name': name},
1558 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1559 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1560 u'ttl': 3600, u'type': u'SOA', u'name': name},
1561 ])
1562
1563 def test_search_rr_substring(self):
1564 name = unique_zone_name()
1565 search = name[5:-5]
1566 self.create_zone(name=name)
1567 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1568 self.assert_success_json(r)
1569 print(r.json())
1570 # should return zone, SOA, ns1, ns2
1571 self.assertEquals(len(r.json()), 4)
1572
1573 def test_search_rr_case_insensitive(self):
1574 name = unique_zone_name()+'testsuffix.'
1575 self.create_zone(name=name)
1576 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1577 self.assert_success_json(r)
1578 print(r.json())
1579 # should return zone, SOA, ns1, ns2
1580 self.assertEquals(len(r.json()), 4)
1581
1582 def test_search_after_rectify_with_ent(self):
1583 name = unique_zone_name()
1584 search = name.split('.')[0]
1585 rrset = {
1586 "name": 'sub.sub.' + name,
1587 "type": "A",
1588 "ttl": 3600,
1589 "records": [{
1590 "content": "4.3.2.1",
1591 "disabled": False,
1592 }],
1593 }
1594 self.create_zone(name=name, rrsets=[rrset])
1595 pdnsutil_rectify(name)
1596 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1597 self.assert_success_json(r)
1598 print(r.json())
1599 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1600 self.assertEquals(len(r.json()), 5)
1601
1602 def test_cname_at_ent_place(self):
1603 name, payload, zone = self.create_zone(api_rectify=True)
1604 rrset = {
1605 'changetype': 'replace',
1606 'name': 'sub2.sub1.' + name,
1607 'type': "A",
1608 'ttl': 3600,
1609 'records': [{
1610 'content': "4.3.2.1",
1611 'disabled': False,
1612 }],
1613 }
1614 payload = {'rrsets': [rrset]}
1615 r = self.session.patch(
1616 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1617 data=json.dumps(payload),
1618 headers={'content-type': 'application/json'})
1619 self.assertEquals(r.status_code, 204)
1620 rrset = {
1621 'changetype': 'replace',
1622 'name': 'sub1.' + name,
1623 'type': "CNAME",
1624 'ttl': 3600,
1625 'records': [{
1626 'content': "www.example.org.",
1627 'disabled': False,
1628 }],
1629 }
1630 payload = {'rrsets': [rrset]}
1631 r = self.session.patch(
1632 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1633 data=json.dumps(payload),
1634 headers={'content-type': 'application/json'})
1635 self.assertEquals(r.status_code, 204)
1636
1637 def test_rrset_parameter_post_false(self):
1638 name = unique_zone_name()
1639 payload = {
1640 'name': name,
1641 'kind': 'Native',
1642 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1643 }
1644 r = self.session.post(
1645 self.url("/api/v1/servers/localhost/zones?rrsets=false"),
1646 data=json.dumps(payload),
1647 headers={'content-type': 'application/json'})
1648 print(r.json())
1649 self.assert_success_json(r)
1650 self.assertEquals(r.status_code, 201)
1651 self.assertEquals(r.json().get('rrsets'), None)
1652
1653 def test_rrset_false_parameter(self):
1654 name = unique_zone_name()
1655 self.create_zone(name=name, kind='Native')
1656 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
1657 self.assert_success_json(r)
1658 print(r.json())
1659 self.assertEquals(r.json().get('rrsets'), None)
1660
1661 def test_rrset_true_parameter(self):
1662 name = unique_zone_name()
1663 self.create_zone(name=name, kind='Native')
1664 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
1665 self.assert_success_json(r)
1666 print(r.json())
1667 self.assertEquals(len(r.json().get('rrsets')), 2)
1668
1669 def test_wrong_rrset_parameter(self):
1670 name = unique_zone_name()
1671 self.create_zone(name=name, kind='Native')
1672 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar"))
1673 self.assertEquals(r.status_code, 422)
1674 self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error'])
1675
1676
1677 @unittest.skipIf(not is_auth(), "Not applicable")
1678 class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
1679
1680 def setUp(self):
1681 super(AuthRootZone, self).setUp()
1682 # zone name is not unique, so delete the zone before each individual test.
1683 self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
1684
1685 def test_create_zone(self):
1686 name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
1687 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
1688 self.assertIn(k, data)
1689 if k in payload:
1690 self.assertEquals(data[k], payload[k])
1691 # validate generated SOA
1692 rec = get_first_rec(data, '.', 'SOA')
1693 self.assertEquals(
1694 rec['content'],
1695 "a.misconfigured.powerdns.server. hostmaster. " + str(payload['serial']) +
1696 " 10800 3600 604800 3600"
1697 )
1698 # Regression test: verify zone list works
1699 zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
1700 print("zonelist:", zonelist)
1701 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
1702 # Also test that fetching the zone works.
1703 print("id:", data['id'])
1704 self.assertEquals(data['id'], '=2E')
1705 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
1706 print("zone (fetched):", data)
1707 for k in ('name', 'kind'):
1708 self.assertIn(k, data)
1709 self.assertEquals(data[k], payload[k])
1710 self.assertEqual(data['rrsets'][0]['name'], '.')
1711
1712 def test_update_zone(self):
1713 name, payload, zone = self.create_zone(name='.')
1714 zone_id = '=2E'
1715 # update, set as Master and enable SOA-EDIT-API
1716 payload = {
1717 'kind': 'Master',
1718 'masters': ['192.0.2.1', '192.0.2.2'],
1719 'soa_edit_api': 'EPOCH',
1720 'soa_edit': 'EPOCH'
1721 }
1722 r = self.session.put(
1723 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1724 data=json.dumps(payload),
1725 headers={'content-type': 'application/json'})
1726 self.assert_success(r)
1727 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1728 for k in payload.keys():
1729 self.assertIn(k, data)
1730 self.assertEquals(data[k], payload[k])
1731 # update, back to Native and empty(off)
1732 payload = {
1733 'kind': 'Native',
1734 'soa_edit_api': '',
1735 'soa_edit': ''
1736 }
1737 r = self.session.put(
1738 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1739 data=json.dumps(payload),
1740 headers={'content-type': 'application/json'})
1741 self.assert_success(r)
1742 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1743 for k in payload.keys():
1744 self.assertIn(k, data)
1745 self.assertEquals(data[k], payload[k])
1746
1747
1748 @unittest.skipIf(not is_recursor(), "Not applicable")
1749 class RecursorZones(ApiTestCase):
1750
1751 def create_zone(self, name=None, kind=None, rd=False, servers=None):
1752 if name is None:
1753 name = unique_zone_name()
1754 if servers is None:
1755 servers = []
1756 payload = {
1757 'name': name,
1758 'kind': kind,
1759 'servers': servers,
1760 'recursion_desired': rd
1761 }
1762 r = self.session.post(
1763 self.url("/api/v1/servers/localhost/zones"),
1764 data=json.dumps(payload),
1765 headers={'content-type': 'application/json'})
1766 self.assert_success_json(r)
1767 return payload, r.json()
1768
1769 def test_create_auth_zone(self):
1770 payload, data = self.create_zone(kind='Native')
1771 for k in payload.keys():
1772 self.assertEquals(data[k], payload[k])
1773
1774 def test_create_zone_no_name(self):
1775 payload = {
1776 'name': '',
1777 'kind': 'Native',
1778 'servers': ['8.8.8.8'],
1779 'recursion_desired': False,
1780 }
1781 print(payload)
1782 r = self.session.post(
1783 self.url("/api/v1/servers/localhost/zones"),
1784 data=json.dumps(payload),
1785 headers={'content-type': 'application/json'})
1786 self.assertEquals(r.status_code, 422)
1787 self.assertIn('is not canonical', r.json()['error'])
1788
1789 def test_create_forwarded_zone(self):
1790 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
1791 # return values are normalized
1792 payload['servers'][0] += ':53'
1793 for k in payload.keys():
1794 self.assertEquals(data[k], payload[k])
1795
1796 def test_create_forwarded_rd_zone(self):
1797 payload, data = self.create_zone(name='google.com.', kind='Forwarded', rd=True, servers=['8.8.8.8'])
1798 # return values are normalized
1799 payload['servers'][0] += ':53'
1800 for k in payload.keys():
1801 self.assertEquals(data[k], payload[k])
1802
1803 def test_create_auth_zone_with_symbols(self):
1804 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
1805 expected_id = (payload['name'].replace('/', '=2F'))
1806 for k in payload.keys():
1807 self.assertEquals(data[k], payload[k])
1808 self.assertEquals(data['id'], expected_id)
1809
1810 def test_rename_auth_zone(self):
1811 payload, data = self.create_zone(kind='Native')
1812 name = payload['name']
1813 # now rename it
1814 payload = {
1815 'name': 'renamed-'+name,
1816 'kind': 'Native',
1817 'recursion_desired': False
1818 }
1819 r = self.session.put(
1820 self.url("/api/v1/servers/localhost/zones/" + name),
1821 data=json.dumps(payload),
1822 headers={'content-type': 'application/json'})
1823 self.assert_success(r)
1824 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + payload['name'])).json()
1825 for k in payload.keys():
1826 self.assertEquals(data[k], payload[k])
1827
1828 def test_zone_delete(self):
1829 payload, zone = self.create_zone(kind='Native')
1830 name = payload['name']
1831 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1832 self.assertEquals(r.status_code, 204)
1833 self.assertNotIn('Content-Type', r.headers)
1834
1835 def test_search_rr_exact_zone(self):
1836 name = unique_zone_name()
1837 self.create_zone(name=name, kind='Native')
1838 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
1839 self.assert_success_json(r)
1840 print(r.json())
1841 self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
1842
1843 def test_search_rr_substring(self):
1844 name = 'search-rr-zone.name.'
1845 self.create_zone(name=name, kind='Native')
1846 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
1847 self.assert_success_json(r)
1848 print(r.json())
1849 # should return zone, SOA
1850 self.assertEquals(len(r.json()), 2)
1851
1852 @unittest.skipIf(not is_auth(), "Not applicable")
1853 class AuthZoneKeys(ApiTestCase, AuthZonesHelperMixin):
1854
1855 def test_get_keys(self):
1856 r = self.session.get(
1857 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
1858 self.assert_success_json(r)
1859 keys = r.json()
1860 self.assertGreater(len(keys), 0)
1861
1862 key0 = deepcopy(keys[0])
1863 del key0['dnskey']
1864 del key0['ds']
1865 expected = {
1866 u'algorithm': u'ECDSAP256SHA256',
1867 u'bits': 256,
1868 u'active': True,
1869 u'type': u'Cryptokey',
1870 u'keytype': u'csk',
1871 u'flags': 257,
1872 u'id': 1}
1873 self.assertEquals(key0, expected)
1874
1875 keydata = keys[0]['dnskey'].split()
1876 self.assertEqual(len(keydata), 4)