]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
Merge pull request #7053 from mind04/doc-warnings
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
1 from __future__ import print_function
2 import json
3 import time
4 import unittest
5 from copy import deepcopy
6 from pprint import pprint
7 from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, get_db_records, pdnsutil_rectify
8
9
10 def get_rrset(data, qname, qtype):
11 for rrset in data['rrsets']:
12 if rrset['name'] == qname and rrset['type'] == qtype:
13 return rrset
14 return None
15
16
17 def get_first_rec(data, qname, qtype):
18 rrset = get_rrset(data, qname, qtype)
19 if rrset:
20 return rrset['records'][0]
21 return None
22
23
24 def eq_zone_rrsets(rrsets, expected):
25 data_got = {}
26 data_expected = {}
27 for type_, expected_records in expected.items():
28 type_ = str(type_)
29 data_got[type_] = set()
30 data_expected[type_] = set()
31 uses_name = any(['name' in expected_record for expected_record in expected_records])
32 # minify + convert received data
33 for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
34 print(rrset)
35 for r in rrset['records']:
36 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
37 # minify expected data
38 for r in expected_records:
39 data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
40
41 print("eq_zone_rrsets: got:")
42 pprint(data_got)
43 print("eq_zone_rrsets: expected:")
44 pprint(data_expected)
45
46 assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
47
48
49 class Zones(ApiTestCase):
50
51 def test_list_zones(self):
52 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
53 self.assert_success_json(r)
54 domains = r.json()
55 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
56 self.assertEquals(len(example_com), 1)
57 example_com = example_com[0]
58 print(example_com)
59 required_fields = ['id', 'url', 'name', 'kind']
60 if is_auth():
61 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial', 'account']
62 self.assertNotEquals(example_com['serial'], 0)
63 elif is_recursor():
64 required_fields = required_fields + ['recursion_desired', 'servers']
65 for field in required_fields:
66 self.assertIn(field, example_com)
67
68
69 class AuthZonesHelperMixin(object):
70 def create_zone(self, name=None, **kwargs):
71 if name is None:
72 name = unique_zone_name()
73 payload = {
74 'name': name,
75 'kind': 'Native',
76 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
77 }
78 for k, v in kwargs.items():
79 if v is None:
80 del payload[k]
81 else:
82 payload[k] = v
83 print("sending", payload)
84 r = self.session.post(
85 self.url("/api/v1/servers/localhost/zones"),
86 data=json.dumps(payload),
87 headers={'content-type': 'application/json'})
88 self.assert_success_json(r)
89 self.assertEquals(r.status_code, 201)
90 reply = r.json()
91 print("reply", reply)
92 return name, payload, reply
93
94
95 @unittest.skipIf(not is_auth(), "Not applicable")
96 class AuthZones(ApiTestCase, AuthZonesHelperMixin):
97
98 def test_create_zone(self):
99 # soa_edit_api has a default, override with empty for this test
100 name, payload, data = self.create_zone(serial=22, soa_edit_api='')
101 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
102 self.assertIn(k, data)
103 if k in payload:
104 self.assertEquals(data[k], payload[k])
105 # validate generated SOA
106 expected_soa = "a.misconfigured.powerdns.server. hostmaster." + name + " " + \
107 str(payload['serial']) + " 10800 3600 604800 3600"
108 self.assertEquals(
109 get_first_rec(data, name, 'SOA')['content'],
110 expected_soa
111 )
112 # Because we had confusion about dots, check that the DB is without dots.
113 dbrecs = get_db_records(name, 'SOA')
114 self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
115
116 def test_create_zone_with_soa_edit_api(self):
117 # soa_edit_api wins over serial
118 name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
119 for k in ('soa_edit_api', ):
120 self.assertIn(k, data)
121 if k in payload:
122 self.assertEquals(data[k], payload[k])
123 # generated EPOCH serial surely is > fixed serial we passed in
124 print(data)
125 self.assertGreater(data['serial'], payload['serial'])
126 soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
127 self.assertGreater(soa_serial, payload['serial'])
128 self.assertEquals(soa_serial, data['serial'])
129
130 def test_create_zone_with_account(self):
131 # soa_edit_api wins over serial
132 name, payload, data = self.create_zone(account='anaccount', serial=10)
133 print(data)
134 for k in ('account', ):
135 self.assertIn(k, data)
136 if k in payload:
137 self.assertEquals(data[k], payload[k])
138
139 def test_create_zone_default_soa_edit_api(self):
140 name, payload, data = self.create_zone()
141 print(data)
142 self.assertEquals(data['soa_edit_api'], 'DEFAULT')
143
144 def test_create_zone_exists(self):
145 name, payload, data = self.create_zone()
146 print(data)
147 payload = {
148 'name': name,
149 'kind': 'Native'
150 }
151 print(payload)
152 r = self.session.post(
153 self.url("/api/v1/servers/localhost/zones"),
154 data=json.dumps(payload),
155 headers={'content-type': 'application/json'})
156 self.assertEquals(r.status_code, 409) # Conflict - already exists
157
158 def test_create_zone_with_soa_edit(self):
159 name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
160 print(data)
161 self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT')
162 self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
163 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
164 # These particular settings lead to the first serial set to YYYYMMDD01.
165 self.assertEquals(soa_serial[-2:], '01')
166 rrset = {
167 'changetype': 'replace',
168 'name': name,
169 'type': 'A',
170 'ttl': 3600,
171 'records': [
172 {
173 "content": "127.0.0.1",
174 "disabled": False
175 }
176 ]
177 }
178 payload = {'rrsets': [rrset]}
179 self.session.patch(
180 self.url("/api/v1/servers/localhost/zones/" + data['id']),
181 data=json.dumps(payload),
182 headers={'content-type': 'application/json'})
183 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
184 data = r.json()
185 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
186 self.assertEquals(soa_serial[-2:], '02')
187
188 def test_create_zone_with_records(self):
189 name = unique_zone_name()
190 rrset = {
191 "name": name,
192 "type": "A",
193 "ttl": 3600,
194 "records": [{
195 "content": "4.3.2.1",
196 "disabled": False,
197 }],
198 }
199 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
200 # check our record has appeared
201 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
202
203 def test_create_zone_with_wildcard_records(self):
204 name = unique_zone_name()
205 rrset = {
206 "name": "*."+name,
207 "type": "A",
208 "ttl": 3600,
209 "records": [{
210 "content": "4.3.2.1",
211 "disabled": False,
212 }],
213 }
214 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
215 # check our record has appeared
216 self.assertEquals(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
217
218 def test_create_zone_with_comments(self):
219 name = unique_zone_name()
220 rrsets = [
221 {
222 "name": name,
223 "type": "soa", # test uppercasing of type, too.
224 "comments": [{
225 "account": "test1",
226 "content": "blah blah",
227 "modified_at": 11112,
228 }],
229 },
230 {
231 "name": name,
232 "type": "AAAA",
233 "ttl": 3600,
234 "records": [{
235 "content": "2001:DB8::1",
236 "disabled": False,
237 }],
238 "comments": [{
239 "account": "test AAAA",
240 "content": "blah blah AAAA",
241 "modified_at": 11112,
242 }],
243 },
244 {
245 "name": name,
246 "type": "TXT",
247 "ttl": 3600,
248 "records": [{
249 "content": "\"test TXT\"",
250 "disabled": False,
251 }],
252 },
253 {
254 "name": name,
255 "type": "A",
256 "ttl": 3600,
257 "records": [{
258 "content": "192.0.2.1",
259 "disabled": False,
260 }],
261 },
262 ]
263 name, payload, data = self.create_zone(name=name, rrsets=rrsets)
264 # NS records have been created
265 self.assertEquals(len(data['rrsets']), len(rrsets) + 1)
266 # check our comment has appeared
267 self.assertEquals(get_rrset(data, name, 'SOA')['comments'], rrsets[0]['comments'])
268 self.assertEquals(get_rrset(data, name, 'A')['comments'], [])
269 self.assertEquals(get_rrset(data, name, 'TXT')['comments'], [])
270 self.assertEquals(get_rrset(data, name, 'AAAA')['comments'], rrsets[1]['comments'])
271
272 def test_create_zone_uncanonical_nameservers(self):
273 name = unique_zone_name()
274 payload = {
275 'name': name,
276 'kind': 'Native',
277 'nameservers': ['uncanon.example.com']
278 }
279 print(payload)
280 r = self.session.post(
281 self.url("/api/v1/servers/localhost/zones"),
282 data=json.dumps(payload),
283 headers={'content-type': 'application/json'})
284 self.assertEquals(r.status_code, 422)
285 self.assertIn('Nameserver is not canonical', r.json()['error'])
286
287 def test_create_auth_zone_no_name(self):
288 name = unique_zone_name()
289 payload = {
290 'name': '',
291 'kind': 'Native',
292 }
293 print(payload)
294 r = self.session.post(
295 self.url("/api/v1/servers/localhost/zones"),
296 data=json.dumps(payload),
297 headers={'content-type': 'application/json'})
298 self.assertEquals(r.status_code, 422)
299 self.assertIn('is not canonical', r.json()['error'])
300
301 def test_create_zone_with_custom_soa(self):
302 name = unique_zone_name()
303 content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
304 rrset = {
305 "name": name,
306 "type": "soa", # test uppercasing of type, too.
307 "ttl": 3600,
308 "records": [{
309 "content": content,
310 "disabled": False,
311 }],
312 }
313 name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
314 self.assertEquals(get_rrset(data, name, 'SOA')['records'], rrset['records'])
315 dbrecs = get_db_records(name, 'SOA')
316 self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
317
318 def test_create_zone_double_dot(self):
319 name = 'test..' + unique_zone_name()
320 payload = {
321 'name': name,
322 'kind': 'Native',
323 'nameservers': ['ns1.example.com.']
324 }
325 print(payload)
326 r = self.session.post(
327 self.url("/api/v1/servers/localhost/zones"),
328 data=json.dumps(payload),
329 headers={'content-type': 'application/json'})
330 self.assertEquals(r.status_code, 422)
331 self.assertIn('Unable to parse DNS Name', r.json()['error'])
332
333 def test_create_zone_restricted_chars(self):
334 name = 'test:' + unique_zone_name() # : isn't good as a name.
335 payload = {
336 'name': name,
337 'kind': 'Native',
338 'nameservers': ['ns1.example.com']
339 }
340 print(payload)
341 r = self.session.post(
342 self.url("/api/v1/servers/localhost/zones"),
343 data=json.dumps(payload),
344 headers={'content-type': 'application/json'})
345 self.assertEquals(r.status_code, 422)
346 self.assertIn('contains unsupported characters', r.json()['error'])
347
348 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self):
349 name = unique_zone_name()
350 rrset = {
351 "name": name,
352 "type": "NS",
353 "ttl": 3600,
354 "records": [{
355 "content": "ns2.example.com.",
356 "disabled": False,
357 }],
358 }
359 payload = {
360 'name': name,
361 'kind': 'Native',
362 'nameservers': ['ns1.example.com.'],
363 'rrsets': [rrset],
364 }
365 print(payload)
366 r = self.session.post(
367 self.url("/api/v1/servers/localhost/zones"),
368 data=json.dumps(payload),
369 headers={'content-type': 'application/json'})
370 self.assertEquals(r.status_code, 422)
371 self.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r.json()['error'])
372
373 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self):
374 name = unique_zone_name()
375 rrset = {
376 "name": 'subzone.'+name,
377 "type": "NS",
378 "ttl": 3600,
379 "records": [{
380 "content": "ns2.example.com.",
381 "disabled": False,
382 }],
383 }
384 payload = {
385 'name': name,
386 'kind': 'Native',
387 'nameservers': ['ns1.example.com.'],
388 'rrsets': [rrset],
389 }
390 print(payload)
391 r = self.session.post(
392 self.url("/api/v1/servers/localhost/zones"),
393 data=json.dumps(payload),
394 headers={'content-type': 'application/json'})
395 self.assert_success_json(r)
396
397 def test_create_zone_with_symbols(self):
398 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
399 name = payload['name']
400 expected_id = name.replace('/', '=2F')
401 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
402 self.assertIn(k, data)
403 if k in payload:
404 self.assertEquals(data[k], payload[k])
405 self.assertEquals(data['id'], expected_id)
406 dbrecs = get_db_records(name, 'SOA')
407 self.assertEqual(dbrecs[0]['name'], name.rstrip('.'))
408
409 def test_create_zone_with_nameservers_non_string(self):
410 # ensure we don't crash
411 name = unique_zone_name()
412 payload = {
413 'name': name,
414 'kind': 'Native',
415 'nameservers': [{'a': 'ns1.example.com'}] # invalid
416 }
417 print(payload)
418 r = self.session.post(
419 self.url("/api/v1/servers/localhost/zones"),
420 data=json.dumps(payload),
421 headers={'content-type': 'application/json'})
422 self.assertEquals(r.status_code, 422)
423
424 def test_create_zone_with_dnssec(self):
425 """
426 Create a zone with "dnssec" set and see if a key was made.
427 """
428 name = unique_zone_name()
429 name, payload, data = self.create_zone(dnssec=True)
430
431 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
432
433 for k in ('dnssec', ):
434 self.assertIn(k, data)
435 if k in payload:
436 self.assertEquals(data[k], payload[k])
437
438 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
439
440 keys = r.json()
441
442 print(keys)
443
444 self.assertEquals(r.status_code, 200)
445 self.assertEquals(len(keys), 1)
446 self.assertEquals(keys[0]['type'], 'Cryptokey')
447 self.assertEquals(keys[0]['active'], True)
448 self.assertEquals(keys[0]['keytype'], 'csk')
449
450 def test_create_zone_with_dnssec_disable_dnssec(self):
451 """
452 Create a zone with "dnssec", then set "dnssec" to false and see if the
453 keys are gone
454 """
455 name = unique_zone_name()
456 name, payload, data = self.create_zone(dnssec=True)
457
458 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
459 data=json.dumps({'dnssec': False}))
460 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
461
462 zoneinfo = r.json()
463
464 self.assertEquals(r.status_code, 200)
465 self.assertEquals(zoneinfo['dnssec'], False)
466
467 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
468
469 keys = r.json()
470
471 self.assertEquals(r.status_code, 200)
472 self.assertEquals(len(keys), 0)
473
474 def test_create_zone_with_nsec3param(self):
475 """
476 Create a zone with "nsec3param" set and see if the metadata was added.
477 """
478 name = unique_zone_name()
479 nsec3param = '1 0 500 aabbccddeeff'
480 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
481
482 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
483
484 for k in ('dnssec', 'nsec3param'):
485 self.assertIn(k, data)
486 if k in payload:
487 self.assertEquals(data[k], payload[k])
488
489 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3PARAM'))
490
491 data = r.json()
492
493 print(data)
494
495 self.assertEquals(r.status_code, 200)
496 self.assertEquals(len(data['metadata']), 1)
497 self.assertEquals(data['kind'], 'NSEC3PARAM')
498 self.assertEquals(data['metadata'][0], nsec3param)
499
500 def test_create_zone_with_nsec3narrow(self):
501 """
502 Create a zone with "nsec3narrow" set and see if the metadata was added.
503 """
504 name = unique_zone_name()
505 nsec3param = '1 0 500 aabbccddeeff'
506 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
507 nsec3narrow=True)
508
509 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
510
511 for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
512 self.assertIn(k, data)
513 if k in payload:
514 self.assertEquals(data[k], payload[k])
515
516 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3NARROW'))
517
518 data = r.json()
519
520 print(data)
521
522 self.assertEquals(r.status_code, 200)
523 self.assertEquals(len(data['metadata']), 1)
524 self.assertEquals(data['kind'], 'NSEC3NARROW')
525 self.assertEquals(data['metadata'][0], '1')
526
527 def test_create_zone_dnssec_serial(self):
528 """
529 Create a zone set/unset "dnssec" and see if the serial was increased
530 after every step
531 """
532 name = unique_zone_name()
533 name, payload, data = self.create_zone()
534
535 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
536 self.assertEquals(soa_serial[-2:], '01')
537
538 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
539 data=json.dumps({'dnssec': True}))
540 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
541
542 data = r.json()
543 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
544
545 self.assertEquals(r.status_code, 200)
546 self.assertEquals(soa_serial[-2:], '02')
547
548 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
549 data=json.dumps({'dnssec': False}))
550 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
551
552 data = r.json()
553 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
554
555 self.assertEquals(r.status_code, 200)
556 self.assertEquals(soa_serial[-2:], '03')
557
558 def test_zone_absolute_url(self):
559 name, payload, data = self.create_zone()
560 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
561 rdata = r.json()
562 print(rdata[0])
563 self.assertTrue(rdata[0]['url'].startswith('/api/v'))
564
565 def test_create_zone_metadata(self):
566 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
567 r = self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
568 data=json.dumps(payload_metadata))
569 rdata = r.json()
570 self.assertEquals(r.status_code, 201)
571 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
572
573 def test_create_zone_metadata_kind(self):
574 payload_metadata = {"metadata": ["127.0.0.2"]}
575 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
576 data=json.dumps(payload_metadata))
577 rdata = r.json()
578 self.assertEquals(r.status_code, 200)
579 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
580
581 def test_create_protected_zone_metadata(self):
582 # test whether it prevents modification of certain kinds
583 for k in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
584 payload = {"metadata": ["FOO", "BAR"]}
585 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k),
586 data=json.dumps(payload))
587 self.assertEquals(r.status_code, 422)
588
589 def test_retrieve_zone_metadata(self):
590 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
591 self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
592 data=json.dumps(payload_metadata))
593 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata"))
594 rdata = r.json()
595 self.assertEquals(r.status_code, 200)
596 self.assertIn(payload_metadata, rdata)
597
598 def test_delete_zone_metadata(self):
599 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
600 self.assertEquals(r.status_code, 200)
601 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
602 rdata = r.json()
603 self.assertEquals(r.status_code, 200)
604 self.assertEquals(rdata["metadata"], [])
605
606 def test_create_external_zone_metadata(self):
607 payload_metadata = {"metadata": ["My very important message"]}
608 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
609 data=json.dumps(payload_metadata))
610 self.assertEquals(r.status_code, 200)
611 rdata = r.json()
612 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
613
614 def test_create_metadata_in_non_existent_zone(self):
615 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
616 r = self.session.post(self.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
617 data=json.dumps(payload_metadata))
618 self.assertEquals(r.status_code, 404)
619 # Note: errors should probably contain json (see #5988)
620 # self.assertIn('Could not find domain ', r.json()['error'])
621
622 def test_create_slave_zone(self):
623 # Test that nameservers can be absent for slave zones.
624 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
625 for k in ('name', 'masters', 'kind'):
626 self.assertIn(k, data)
627 self.assertEquals(data[k], payload[k])
628 print("payload:", payload)
629 print("data:", data)
630 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
631 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
632 zonelist = r.json()
633 print("zonelist:", zonelist)
634 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
635 # Also test that fetching the zone works.
636 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
637 data = r.json()
638 print("zone (fetched):", data)
639 for k in ('name', 'masters', 'kind'):
640 self.assertIn(k, data)
641 self.assertEquals(data[k], payload[k])
642 self.assertEqual(data['serial'], 0)
643 self.assertEqual(data['rrsets'], [])
644
645 def test_find_zone_by_name(self):
646 name = 'foo/' + unique_zone_name()
647 name, payload, data = self.create_zone(name=name)
648 r = self.session.get(self.url("/api/v1/servers/localhost/zones?zone=" + name))
649 data = r.json()
650 print(data)
651 self.assertEquals(data[0]['name'], name)
652
653 def test_delete_slave_zone(self):
654 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
655 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
656 r.raise_for_status()
657
658 def test_retrieve_slave_zone(self):
659 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
660 print("payload:", payload)
661 print("data:", data)
662 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
663 data = r.json()
664 print("status for axfr-retrieve:", data)
665 self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
666 '\' from master 127.0.0.2')
667
668 def test_notify_master_zone(self):
669 name, payload, data = self.create_zone(kind='Master')
670 print("payload:", payload)
671 print("data:", data)
672 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
673 data = r.json()
674 print("status for notify:", data)
675 self.assertEqual(data['result'], 'Notification queued')
676
677 def test_get_zone_with_symbols(self):
678 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
679 name = payload['name']
680 zone_id = (name.replace('/', '=2F'))
681 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
682 data = r.json()
683 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
684 self.assertIn(k, data)
685 if k in payload:
686 self.assertEquals(data[k], payload[k])
687
688 def test_get_zone(self):
689 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
690 domains = r.json()
691 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
692 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id']))
693 self.assert_success_json(r)
694 data = r.json()
695 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
696 self.assertIn(k, data)
697 self.assertEquals(data['name'], 'example.com.')
698
699 def test_import_zone_broken(self):
700 payload = {}
701 payload['zone'] = """
702 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
703 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
704 ;; WARNING: recursion requested but not available
705
706 ;; OPT PSEUDOSECTION:
707 ; EDNS: version: 0, flags:; udp: 1680
708 ;; QUESTION SECTION:
709 ;powerdns.com. IN SOA
710
711 ;; ANSWER SECTION:
712 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
713 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
714 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
715 powerdns-broken.com. 86400 IN A 82.94.213.34
716 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
717 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
718 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
719 """
720 payload['name'] = 'powerdns-broken.com.'
721 payload['kind'] = 'Master'
722 payload['nameservers'] = []
723 r = self.session.post(
724 self.url("/api/v1/servers/localhost/zones"),
725 data=json.dumps(payload),
726 headers={'content-type': 'application/json'})
727 self.assertEquals(r.status_code, 422)
728
729 def test_import_zone_axfr_outofzone(self):
730 # Ensure we don't create out-of-zone records
731 name = unique_zone_name()
732 payload = {}
733 payload['zone'] = """
734 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
735 NAME 3600 IN NS powerdnssec2.ds9a.nl.
736 example.org. 3600 IN AAAA 2001:888:2000:1d::2
737 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
738 """.replace('NAME', name)
739 payload['name'] = name
740 payload['kind'] = 'Master'
741 payload['nameservers'] = []
742 r = self.session.post(
743 self.url("/api/v1/servers/localhost/zones"),
744 data=json.dumps(payload),
745 headers={'content-type': 'application/json'})
746 self.assertEquals(r.status_code, 422)
747 self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
748
749 def test_import_zone_axfr(self):
750 payload = {}
751 payload['zone'] = """
752 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
753 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
754 ;; WARNING: recursion requested but not available
755
756 ;; OPT PSEUDOSECTION:
757 ; EDNS: version: 0, flags:; udp: 1680
758 ;; QUESTION SECTION:
759 ;powerdns.com. IN SOA
760
761 ;; ANSWER SECTION:
762 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
763 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
764 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
765 powerdns.com. 86400 IN A 82.94.213.34
766 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
767 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
768 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
769 """
770 payload['name'] = 'powerdns.com.'
771 payload['kind'] = 'Master'
772 payload['nameservers'] = []
773 payload['soa_edit_api'] = '' # turn off so exact SOA comparison works.
774 r = self.session.post(
775 self.url("/api/v1/servers/localhost/zones"),
776 data=json.dumps(payload),
777 headers={'content-type': 'application/json'})
778 self.assert_success_json(r)
779 data = r.json()
780 self.assertIn('name', data)
781
782 expected = {
783 'NS': [
784 {'content': 'powerdnssec1.ds9a.nl.'},
785 {'content': 'powerdnssec2.ds9a.nl.'},
786 ],
787 'SOA': [
788 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
789 ],
790 'MX': [
791 {'content': '0 xs.powerdns.com.'},
792 ],
793 'A': [
794 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
795 ],
796 'AAAA': [
797 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
798 ],
799 }
800
801 eq_zone_rrsets(data['rrsets'], expected)
802
803 # check content in DB is stored WITHOUT trailing dot.
804 dbrecs = get_db_records(payload['name'], 'NS')
805 dbrec = next((dbrec for dbrec in dbrecs if dbrec['content'].startswith('powerdnssec1')))
806 self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
807
808 def test_import_zone_bind(self):
809 payload = {}
810 payload['zone'] = """
811 $TTL 86400 ; 24 hours could have been written as 24h or 1d
812 ; $TTL used for all RRs without explicit TTL value
813 $ORIGIN example.org.
814 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
815 2002022401 ; serial
816 3H ; refresh
817 15 ; retry
818 1w ; expire
819 3h ; minimum
820 )
821 IN NS ns1.example.org. ; in the domain
822 IN NS ns2.smokeyjoe.com. ; external to domain
823 IN MX 10 mail.another.com. ; external mail provider
824 ; server host definitions
825 ns1 IN A 192.168.0.1 ;name server definition
826 www IN A 192.168.0.2 ;web server definition
827 ftp IN CNAME www.example.org. ;ftp server definition
828 ; non server domain hosts
829 bill IN A 192.168.0.3
830 fred IN A 192.168.0.4
831 """
832 payload['name'] = 'example.org.'
833 payload['kind'] = 'Master'
834 payload['nameservers'] = []
835 payload['soa_edit_api'] = '' # turn off so exact SOA comparison works.
836 r = self.session.post(
837 self.url("/api/v1/servers/localhost/zones"),
838 data=json.dumps(payload),
839 headers={'content-type': 'application/json'})
840 self.assert_success_json(r)
841 data = r.json()
842 self.assertIn('name', data)
843
844 expected = {
845 'NS': [
846 {'content': 'ns1.example.org.'},
847 {'content': 'ns2.smokeyjoe.com.'},
848 ],
849 'SOA': [
850 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
851 ],
852 'MX': [
853 {'content': '10 mail.another.com.'},
854 ],
855 'A': [
856 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
857 {'content': '192.168.0.2', 'name': 'www.example.org.'},
858 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
859 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
860 ],
861 'CNAME': [
862 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
863 ],
864 }
865
866 eq_zone_rrsets(data['rrsets'], expected)
867
868 def test_export_zone_json(self):
869 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
870 # export it
871 r = self.session.get(
872 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
873 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
874 )
875 self.assert_success_json(r)
876 data = r.json()
877 self.assertIn('zone', data)
878 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
879 name + '\t3600\tIN\tNS\tns2.foo.com.',
880 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
881 ' 0 10800 3600 604800 3600']
882 self.assertEquals(data['zone'].strip().split('\n'), expected_data)
883
884 def test_export_zone_text(self):
885 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
886 # export it
887 r = self.session.get(
888 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
889 headers={'accept': '*/*'}
890 )
891 data = r.text.strip().split("\n")
892 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
893 name + '\t3600\tIN\tNS\tns2.foo.com.',
894 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
895 ' 0 10800 3600 604800 3600']
896 self.assertEquals(data, expected_data)
897
898 def test_update_zone(self):
899 name, payload, zone = self.create_zone()
900 name = payload['name']
901 # update, set as Master and enable SOA-EDIT-API
902 payload = {
903 'kind': 'Master',
904 'masters': ['192.0.2.1', '192.0.2.2'],
905 'soa_edit_api': 'EPOCH',
906 'soa_edit': 'EPOCH'
907 }
908 r = self.session.put(
909 self.url("/api/v1/servers/localhost/zones/" + name),
910 data=json.dumps(payload),
911 headers={'content-type': 'application/json'})
912 self.assert_success(r)
913 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
914 for k in payload.keys():
915 self.assertIn(k, data)
916 self.assertEquals(data[k], payload[k])
917 # update, back to Native and empty(off)
918 payload = {
919 'kind': 'Native',
920 'soa_edit_api': '',
921 'soa_edit': ''
922 }
923 r = self.session.put(
924 self.url("/api/v1/servers/localhost/zones/" + name),
925 data=json.dumps(payload),
926 headers={'content-type': 'application/json'})
927 self.assert_success(r)
928 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
929 for k in payload.keys():
930 self.assertIn(k, data)
931 self.assertEquals(data[k], payload[k])
932
933 def test_zone_rr_update(self):
934 name, payload, zone = self.create_zone()
935 # do a replace (= update)
936 rrset = {
937 'changetype': 'replace',
938 'name': name,
939 'type': 'ns',
940 'ttl': 3600,
941 'records': [
942 {
943 "content": "ns1.bar.com.",
944 "disabled": False
945 },
946 {
947 "content": "ns2-disabled.bar.com.",
948 "disabled": True
949 }
950 ]
951 }
952 payload = {'rrsets': [rrset]}
953 r = self.session.patch(
954 self.url("/api/v1/servers/localhost/zones/" + name),
955 data=json.dumps(payload),
956 headers={'content-type': 'application/json'})
957 self.assert_success(r)
958 # verify that (only) the new record is there
959 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
960 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset['records'])
961
962 def test_zone_rr_update_mx(self):
963 # Important to test with MX records, as they have a priority field, which must end up in the content field.
964 name, payload, zone = self.create_zone()
965 # do a replace (= update)
966 rrset = {
967 'changetype': 'replace',
968 'name': name,
969 'type': 'MX',
970 'ttl': 3600,
971 'records': [
972 {
973 "content": "10 mail.example.org.",
974 "disabled": False
975 }
976 ]
977 }
978 payload = {'rrsets': [rrset]}
979 r = self.session.patch(
980 self.url("/api/v1/servers/localhost/zones/" + name),
981 data=json.dumps(payload),
982 headers={'content-type': 'application/json'})
983 self.assert_success(r)
984 # verify that (only) the new record is there
985 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
986 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset['records'])
987
988 def test_zone_rr_update_opt(self):
989 name, payload, zone = self.create_zone()
990 # do a replace (= update)
991 rrset = {
992 'changetype': 'replace',
993 'name': name,
994 'type': 'OPT',
995 'ttl': 3600,
996 'records': [
997 {
998 "content": "9",
999 "disabled": False
1000 }
1001 ]
1002 }
1003 payload = {'rrsets': [rrset]}
1004 r = self.session.patch(
1005 self.url("/api/v1/servers/localhost/zones/" + name),
1006 data=json.dumps(payload),
1007 headers={'content-type': 'application/json'})
1008 self.assertEquals(r.status_code, 422)
1009 self.assertIn('OPT: invalid type given', r.json()['error'])
1010
1011 def test_zone_rr_update_multiple_rrsets(self):
1012 name, payload, zone = self.create_zone()
1013 rrset1 = {
1014 'changetype': 'replace',
1015 'name': name,
1016 'type': 'NS',
1017 'ttl': 3600,
1018 'records': [
1019 {
1020
1021 "content": "ns9999.example.com.",
1022 "disabled": False
1023 }
1024 ]
1025 }
1026 rrset2 = {
1027 'changetype': 'replace',
1028 'name': name,
1029 'type': 'MX',
1030 'ttl': 3600,
1031 'records': [
1032 {
1033 "content": "10 mx444.example.com.",
1034 "disabled": False
1035 }
1036 ]
1037 }
1038 payload = {'rrsets': [rrset1, rrset2]}
1039 r = self.session.patch(
1040 self.url("/api/v1/servers/localhost/zones/" + name),
1041 data=json.dumps(payload),
1042 headers={'content-type': 'application/json'})
1043 self.assert_success(r)
1044 # verify that all rrsets have been updated
1045 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1046 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset1['records'])
1047 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset2['records'])
1048
1049 def test_zone_rr_update_duplicate_record(self):
1050 name, payload, zone = self.create_zone()
1051 rrset = {
1052 'changetype': 'replace',
1053 'name': name,
1054 'type': 'NS',
1055 'ttl': 3600,
1056 'records': [
1057 {"content": "ns9999.example.com.", "disabled": False},
1058 {"content": "ns9996.example.com.", "disabled": False},
1059 {"content": "ns9987.example.com.", "disabled": False},
1060 {"content": "ns9988.example.com.", "disabled": False},
1061 {"content": "ns9999.example.com.", "disabled": False},
1062 ]
1063 }
1064 payload = {'rrsets': [rrset]}
1065 r = self.session.patch(
1066 self.url("/api/v1/servers/localhost/zones/" + name),
1067 data=json.dumps(payload),
1068 headers={'content-type': 'application/json'})
1069 self.assertEquals(r.status_code, 422)
1070 self.assertIn('Duplicate record in RRset', r.json()['error'])
1071
1072 def test_zone_rr_update_duplicate_rrset(self):
1073 name, payload, zone = self.create_zone()
1074 rrset1 = {
1075 'changetype': 'replace',
1076 'name': name,
1077 'type': 'NS',
1078 'ttl': 3600,
1079 'records': [
1080 {
1081 "content": "ns9999.example.com.",
1082 "disabled": False
1083 }
1084 ]
1085 }
1086 rrset2 = {
1087 'changetype': 'replace',
1088 'name': name,
1089 'type': 'NS',
1090 'ttl': 3600,
1091 'records': [
1092 {
1093 "content": "ns9998.example.com.",
1094 "disabled": False
1095 }
1096 ]
1097 }
1098 payload = {'rrsets': [rrset1, rrset2]}
1099 r = self.session.patch(
1100 self.url("/api/v1/servers/localhost/zones/" + name),
1101 data=json.dumps(payload),
1102 headers={'content-type': 'application/json'})
1103 self.assertEquals(r.status_code, 422)
1104 self.assertIn('Duplicate RRset', r.json()['error'])
1105
1106 def test_zone_rr_delete(self):
1107 name, payload, zone = self.create_zone()
1108 # do a delete of all NS records (these are created with the zone)
1109 rrset = {
1110 'changetype': 'delete',
1111 'name': name,
1112 'type': 'NS'
1113 }
1114 payload = {'rrsets': [rrset]}
1115 r = self.session.patch(
1116 self.url("/api/v1/servers/localhost/zones/" + name),
1117 data=json.dumps(payload),
1118 headers={'content-type': 'application/json'})
1119 self.assert_success(r)
1120 # verify that the records are gone
1121 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1122 self.assertIsNone(get_rrset(data, name, 'NS'))
1123
1124 def test_zone_disable_reenable(self):
1125 # This also tests that SOA-EDIT-API works.
1126 name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
1127 # disable zone by disabling SOA
1128 rrset = {
1129 'changetype': 'replace',
1130 'name': name,
1131 'type': 'SOA',
1132 'ttl': 3600,
1133 'records': [
1134 {
1135 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1136 "disabled": True
1137 }
1138 ]
1139 }
1140 payload = {'rrsets': [rrset]}
1141 r = self.session.patch(
1142 self.url("/api/v1/servers/localhost/zones/" + name),
1143 data=json.dumps(payload),
1144 headers={'content-type': 'application/json'})
1145 self.assert_success(r)
1146 # check SOA serial has been edited
1147 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1148 soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1149 self.assertNotEquals(soa_serial1, '1')
1150 # make sure domain is still in zone list (disabled SOA!)
1151 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
1152 domains = r.json()
1153 self.assertEquals(len([domain for domain in domains if domain['name'] == name]), 1)
1154 # sleep 1sec to ensure the EPOCH value changes for the next request
1155 time.sleep(1)
1156 # verify that modifying it still works
1157 rrset['records'][0]['disabled'] = False
1158 payload = {'rrsets': [rrset]}
1159 r = self.session.patch(
1160 self.url("/api/v1/servers/localhost/zones/" + name),
1161 data=json.dumps(payload),
1162 headers={'content-type': 'application/json'})
1163 self.assert_success(r)
1164 # check SOA serial has been edited again
1165 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1166 soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1167 self.assertNotEquals(soa_serial2, '1')
1168 self.assertNotEquals(soa_serial2, soa_serial1)
1169
1170 def test_zone_rr_update_out_of_zone(self):
1171 name, payload, zone = self.create_zone()
1172 # replace with qname mismatch
1173 rrset = {
1174 'changetype': 'replace',
1175 'name': 'not-in-zone.',
1176 'type': 'NS',
1177 'ttl': 3600,
1178 'records': [
1179 {
1180 "content": "ns1.bar.com.",
1181 "disabled": False
1182 }
1183 ]
1184 }
1185 payload = {'rrsets': [rrset]}
1186 r = self.session.patch(
1187 self.url("/api/v1/servers/localhost/zones/" + name),
1188 data=json.dumps(payload),
1189 headers={'content-type': 'application/json'})
1190 self.assertEquals(r.status_code, 422)
1191 self.assertIn('out of zone', r.json()['error'])
1192
1193 def test_zone_rr_update_restricted_chars(self):
1194 name, payload, zone = self.create_zone()
1195 # replace with qname mismatch
1196 rrset = {
1197 'changetype': 'replace',
1198 'name': 'test:' + name,
1199 'type': 'NS',
1200 'ttl': 3600,
1201 'records': [
1202 {
1203 "content": "ns1.bar.com.",
1204 "disabled": False
1205 }
1206 ]
1207 }
1208 payload = {'rrsets': [rrset]}
1209 r = self.session.patch(
1210 self.url("/api/v1/servers/localhost/zones/" + name),
1211 data=json.dumps(payload),
1212 headers={'content-type': 'application/json'})
1213 self.assertEquals(r.status_code, 422)
1214 self.assertIn('contains unsupported characters', r.json()['error'])
1215
1216 def test_rrset_unknown_type(self):
1217 name, payload, zone = self.create_zone()
1218 rrset = {
1219 'changetype': 'replace',
1220 'name': name,
1221 'type': 'FAFAFA',
1222 'ttl': 3600,
1223 'records': [
1224 {
1225 "content": "4.3.2.1",
1226 "disabled": False
1227 }
1228 ]
1229 }
1230 payload = {'rrsets': [rrset]}
1231 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1232 headers={'content-type': 'application/json'})
1233 self.assertEquals(r.status_code, 422)
1234 self.assertIn('unknown type', r.json()['error'])
1235
1236 def test_rrset_cname_and_other(self):
1237 name, payload, zone = self.create_zone()
1238 rrset = {
1239 'changetype': 'replace',
1240 'name': name,
1241 'type': 'CNAME',
1242 'ttl': 3600,
1243 'records': [
1244 {
1245 "content": "example.org.",
1246 "disabled": False
1247 }
1248 ]
1249 }
1250 payload = {'rrsets': [rrset]}
1251 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1252 headers={'content-type': 'application/json'})
1253 self.assertEquals(r.status_code, 422)
1254 self.assertIn('Conflicts with pre-existing non-CNAME RRset', r.json()['error'])
1255
1256 def test_rrset_other_and_cname(self):
1257 name, payload, zone = self.create_zone()
1258 rrset = {
1259 'changetype': 'replace',
1260 'name': 'sub.'+name,
1261 'type': 'CNAME',
1262 'ttl': 3600,
1263 'records': [
1264 {
1265 "content": "example.org.",
1266 "disabled": False
1267 }
1268 ]
1269 }
1270 payload = {'rrsets': [rrset]}
1271 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1272 headers={'content-type': 'application/json'})
1273 self.assert_success(r)
1274 rrset = {
1275 'changetype': 'replace',
1276 'name': 'sub.'+name,
1277 'type': 'A',
1278 'ttl': 3600,
1279 'records': [
1280 {
1281 "content": "1.2.3.4",
1282 "disabled": False
1283 }
1284 ]
1285 }
1286 payload = {'rrsets': [rrset]}
1287 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1288 headers={'content-type': 'application/json'})
1289 self.assertEquals(r.status_code, 422)
1290 self.assertIn('Conflicts with pre-existing CNAME RRset', r.json()['error'])
1291
1292 def test_create_zone_with_leading_space(self):
1293 # Actual regression.
1294 name, payload, zone = self.create_zone()
1295 rrset = {
1296 'changetype': 'replace',
1297 'name': name,
1298 'type': 'A',
1299 'ttl': 3600,
1300 'records': [
1301 {
1302 "content": " 4.3.2.1",
1303 "disabled": False
1304 }
1305 ]
1306 }
1307 payload = {'rrsets': [rrset]}
1308 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1309 headers={'content-type': 'application/json'})
1310 self.assertEquals(r.status_code, 422)
1311 self.assertIn('Not in expected format', r.json()['error'])
1312
1313 def test_zone_rr_delete_out_of_zone(self):
1314 name, payload, zone = self.create_zone()
1315 rrset = {
1316 'changetype': 'delete',
1317 'name': 'not-in-zone.',
1318 'type': 'NS'
1319 }
1320 payload = {'rrsets': [rrset]}
1321 r = self.session.patch(
1322 self.url("/api/v1/servers/localhost/zones/" + name),
1323 data=json.dumps(payload),
1324 headers={'content-type': 'application/json'})
1325 print(r.content)
1326 self.assert_success(r) # succeed so users can fix their wrong, old data
1327
1328 def test_zone_delete(self):
1329 name, payload, zone = self.create_zone()
1330 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1331 self.assertEquals(r.status_code, 204)
1332 self.assertNotIn('Content-Type', r.headers)
1333
1334 def test_zone_comment_create(self):
1335 name, payload, zone = self.create_zone()
1336 rrset = {
1337 'changetype': 'replace',
1338 'name': name,
1339 'type': 'NS',
1340 'ttl': 3600,
1341 'comments': [
1342 {
1343 'account': 'test1',
1344 'content': 'blah blah',
1345 },
1346 {
1347 'account': 'test2',
1348 'content': 'blah blah bleh',
1349 }
1350 ]
1351 }
1352 payload = {'rrsets': [rrset]}
1353 r = self.session.patch(
1354 self.url("/api/v1/servers/localhost/zones/" + name),
1355 data=json.dumps(payload),
1356 headers={'content-type': 'application/json'})
1357 self.assert_success(r)
1358 # make sure the comments have been set, and that the NS
1359 # records are still present
1360 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1361 serverset = get_rrset(data, name, 'NS')
1362 print(serverset)
1363 self.assertNotEquals(serverset['records'], [])
1364 self.assertNotEquals(serverset['comments'], [])
1365 # verify that modified_at has been set by pdns
1366 self.assertNotEquals([c for c in serverset['comments']][0]['modified_at'], 0)
1367 # verify that TTL is correct (regression test)
1368 self.assertEquals(serverset['ttl'], 3600)
1369
1370 def test_zone_comment_delete(self):
1371 # Test: Delete ONLY comments.
1372 name, payload, zone = self.create_zone()
1373 rrset = {
1374 'changetype': 'replace',
1375 'name': name,
1376 'type': 'NS',
1377 'comments': []
1378 }
1379 payload = {'rrsets': [rrset]}
1380 r = self.session.patch(
1381 self.url("/api/v1/servers/localhost/zones/" + name),
1382 data=json.dumps(payload),
1383 headers={'content-type': 'application/json'})
1384 self.assert_success(r)
1385 # make sure the NS records are still present
1386 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1387 serverset = get_rrset(data, name, 'NS')
1388 print(serverset)
1389 self.assertNotEquals(serverset['records'], [])
1390 self.assertEquals(serverset['comments'], [])
1391
1392 def test_zone_comment_stay_intact(self):
1393 # Test if comments on an rrset stay intact if the rrset is replaced
1394 name, payload, zone = self.create_zone()
1395 # create a comment
1396 rrset = {
1397 'changetype': 'replace',
1398 'name': name,
1399 'type': 'NS',
1400 'comments': [
1401 {
1402 'account': 'test1',
1403 'content': 'oh hi there',
1404 'modified_at': 1111
1405 }
1406 ]
1407 }
1408 payload = {'rrsets': [rrset]}
1409 r = self.session.patch(
1410 self.url("/api/v1/servers/localhost/zones/" + name),
1411 data=json.dumps(payload),
1412 headers={'content-type': 'application/json'})
1413 self.assert_success(r)
1414 # replace rrset records
1415 rrset2 = {
1416 'changetype': 'replace',
1417 'name': name,
1418 'type': 'NS',
1419 'ttl': 3600,
1420 'records': [
1421 {
1422 "content": "ns1.bar.com.",
1423 "disabled": False
1424 }
1425 ]
1426 }
1427 payload2 = {'rrsets': [rrset2]}
1428 r = self.session.patch(
1429 self.url("/api/v1/servers/localhost/zones/" + name),
1430 data=json.dumps(payload2),
1431 headers={'content-type': 'application/json'})
1432 self.assert_success(r)
1433 # make sure the comments still exist
1434 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1435 serverset = get_rrset(data, name, 'NS')
1436 print(serverset)
1437 self.assertEquals(serverset['records'], rrset2['records'])
1438 self.assertEquals(serverset['comments'], rrset['comments'])
1439
1440 def test_zone_auto_ptr_ipv4_create(self):
1441 revzone = '4.2.192.in-addr.arpa.'
1442 _, _, revzonedata = self.create_zone(name=revzone)
1443 name = unique_zone_name()
1444 rrset = {
1445 "name": name,
1446 "type": "A",
1447 "ttl": 3600,
1448 "records": [{
1449 "content": "192.2.4.44",
1450 "disabled": False,
1451 "set-ptr": True,
1452 }],
1453 }
1454 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
1455 del rrset['records'][0]['set-ptr']
1456 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
1457 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1458 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1459 print(revsets)
1460 self.assertEquals(revsets, [{
1461 u'name': u'44.4.2.192.in-addr.arpa.',
1462 u'ttl': 3600,
1463 u'type': u'PTR',
1464 u'comments': [],
1465 u'records': [{
1466 u'content': name,
1467 u'disabled': False,
1468 }],
1469 }])
1470 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1471 self.assertGreater(r['serial'], revzonedata['serial'])
1472
1473 def test_zone_auto_ptr_ipv4_update(self):
1474 revzone = '0.2.192.in-addr.arpa.'
1475 _, _, revzonedata = self.create_zone(name=revzone)
1476 name, payload, zone = self.create_zone()
1477 rrset = {
1478 'changetype': 'replace',
1479 'name': name,
1480 'type': 'A',
1481 'ttl': 3600,
1482 'records': [
1483 {
1484 "content": '192.2.0.2',
1485 "disabled": False,
1486 "set-ptr": True
1487 }
1488 ]
1489 }
1490 payload = {'rrsets': [rrset]}
1491 r = self.session.patch(
1492 self.url("/api/v1/servers/localhost/zones/" + name),
1493 data=json.dumps(payload),
1494 headers={'content-type': 'application/json'})
1495 self.assert_success(r)
1496 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1497 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1498 print(revsets)
1499 self.assertEquals(revsets, [{
1500 u'name': u'2.0.2.192.in-addr.arpa.',
1501 u'ttl': 3600,
1502 u'type': u'PTR',
1503 u'comments': [],
1504 u'records': [{
1505 u'content': name,
1506 u'disabled': False,
1507 }],
1508 }])
1509 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1510 self.assertGreater(r['serial'], revzonedata['serial'])
1511
1512 def test_zone_auto_ptr_ipv6_update(self):
1513 # 2001:DB8::bb:aa
1514 revzone = '8.b.d.0.1.0.0.2.ip6.arpa.'
1515 _, _, revzonedata = self.create_zone(name=revzone)
1516 name, payload, zone = self.create_zone()
1517 rrset = {
1518 'changetype': 'replace',
1519 'name': name,
1520 'type': 'AAAA',
1521 'ttl': 3600,
1522 'records': [
1523 {
1524 "content": '2001:DB8::bb:aa',
1525 "disabled": False,
1526 "set-ptr": True
1527 }
1528 ]
1529 }
1530 payload = {'rrsets': [rrset]}
1531 r = self.session.patch(
1532 self.url("/api/v1/servers/localhost/zones/" + name),
1533 data=json.dumps(payload),
1534 headers={'content-type': 'application/json'})
1535 self.assert_success(r)
1536 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1537 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1538 print(revsets)
1539 self.assertEquals(revsets, [{
1540 u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1541 u'ttl': 3600,
1542 u'type': u'PTR',
1543 u'comments': [],
1544 u'records': [{
1545 u'content': name,
1546 u'disabled': False,
1547 }],
1548 }])
1549 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1550 self.assertGreater(r['serial'], revzonedata['serial'])
1551
1552 def test_search_rr_exact_zone(self):
1553 name = unique_zone_name()
1554 self.create_zone(name=name, serial=22, soa_edit_api='')
1555 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
1556 self.assert_success_json(r)
1557 print(r.json())
1558 self.assertEquals(r.json(), [
1559 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1560 {u'content': u'ns1.example.com.',
1561 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1562 u'ttl': 3600, u'type': u'NS', u'name': name},
1563 {u'content': u'ns2.example.com.',
1564 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1565 u'ttl': 3600, u'type': u'NS', u'name': name},
1566 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1567 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1568 u'ttl': 3600, u'type': u'SOA', u'name': name},
1569 ])
1570
1571 def test_search_rr_substring(self):
1572 name = unique_zone_name()
1573 search = name[5:-5]
1574 self.create_zone(name=name)
1575 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1576 self.assert_success_json(r)
1577 print(r.json())
1578 # should return zone, SOA, ns1, ns2
1579 self.assertEquals(len(r.json()), 4)
1580
1581 def test_search_rr_case_insensitive(self):
1582 name = unique_zone_name()+'testsuffix.'
1583 self.create_zone(name=name)
1584 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1585 self.assert_success_json(r)
1586 print(r.json())
1587 # should return zone, SOA, ns1, ns2
1588 self.assertEquals(len(r.json()), 4)
1589
1590 def test_search_after_rectify_with_ent(self):
1591 name = unique_zone_name()
1592 search = name.split('.')[0]
1593 rrset = {
1594 "name": 'sub.sub.' + name,
1595 "type": "A",
1596 "ttl": 3600,
1597 "records": [{
1598 "content": "4.3.2.1",
1599 "disabled": False,
1600 }],
1601 }
1602 self.create_zone(name=name, rrsets=[rrset])
1603 pdnsutil_rectify(name)
1604 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1605 self.assert_success_json(r)
1606 print(r.json())
1607 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1608 self.assertEquals(len(r.json()), 5)
1609
1610 def test_cname_at_ent_place(self):
1611 name, payload, zone = self.create_zone(api_rectify=True)
1612 rrset = {
1613 'changetype': 'replace',
1614 'name': 'sub2.sub1.' + name,
1615 'type': "A",
1616 'ttl': 3600,
1617 'records': [{
1618 'content': "4.3.2.1",
1619 'disabled': False,
1620 }],
1621 }
1622 payload = {'rrsets': [rrset]}
1623 r = self.session.patch(
1624 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1625 data=json.dumps(payload),
1626 headers={'content-type': 'application/json'})
1627 self.assertEquals(r.status_code, 204)
1628 rrset = {
1629 'changetype': 'replace',
1630 'name': 'sub1.' + name,
1631 'type': "CNAME",
1632 'ttl': 3600,
1633 'records': [{
1634 'content': "www.example.org.",
1635 'disabled': False,
1636 }],
1637 }
1638 payload = {'rrsets': [rrset]}
1639 r = self.session.patch(
1640 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1641 data=json.dumps(payload),
1642 headers={'content-type': 'application/json'})
1643 self.assertEquals(r.status_code, 204)
1644
1645 def test_rrset_parameter_post_false(self):
1646 name = unique_zone_name()
1647 payload = {
1648 'name': name,
1649 'kind': 'Native',
1650 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1651 }
1652 r = self.session.post(
1653 self.url("/api/v1/servers/localhost/zones?rrsets=false"),
1654 data=json.dumps(payload),
1655 headers={'content-type': 'application/json'})
1656 print(r.json())
1657 self.assert_success_json(r)
1658 self.assertEquals(r.status_code, 201)
1659 self.assertEquals(r.json().get('rrsets'), None)
1660
1661 def test_rrset_false_parameter(self):
1662 name = unique_zone_name()
1663 self.create_zone(name=name, kind='Native')
1664 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
1665 self.assert_success_json(r)
1666 print(r.json())
1667 self.assertEquals(r.json().get('rrsets'), None)
1668
1669 def test_rrset_true_parameter(self):
1670 name = unique_zone_name()
1671 self.create_zone(name=name, kind='Native')
1672 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
1673 self.assert_success_json(r)
1674 print(r.json())
1675 self.assertEquals(len(r.json().get('rrsets')), 2)
1676
1677 def test_wrong_rrset_parameter(self):
1678 name = unique_zone_name()
1679 self.create_zone(name=name, kind='Native')
1680 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar"))
1681 self.assertEquals(r.status_code, 422)
1682 self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error'])
1683
1684
1685 @unittest.skipIf(not is_auth(), "Not applicable")
1686 class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
1687
1688 def setUp(self):
1689 super(AuthRootZone, self).setUp()
1690 # zone name is not unique, so delete the zone before each individual test.
1691 self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
1692
1693 def test_create_zone(self):
1694 name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
1695 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
1696 self.assertIn(k, data)
1697 if k in payload:
1698 self.assertEquals(data[k], payload[k])
1699 # validate generated SOA
1700 rec = get_first_rec(data, '.', 'SOA')
1701 self.assertEquals(
1702 rec['content'],
1703 "a.misconfigured.powerdns.server. hostmaster. " + str(payload['serial']) +
1704 " 10800 3600 604800 3600"
1705 )
1706 # Regression test: verify zone list works
1707 zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
1708 print("zonelist:", zonelist)
1709 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
1710 # Also test that fetching the zone works.
1711 print("id:", data['id'])
1712 self.assertEquals(data['id'], '=2E')
1713 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
1714 print("zone (fetched):", data)
1715 for k in ('name', 'kind'):
1716 self.assertIn(k, data)
1717 self.assertEquals(data[k], payload[k])
1718 self.assertEqual(data['rrsets'][0]['name'], '.')
1719
1720 def test_update_zone(self):
1721 name, payload, zone = self.create_zone(name='.')
1722 zone_id = '=2E'
1723 # update, set as Master and enable SOA-EDIT-API
1724 payload = {
1725 'kind': 'Master',
1726 'masters': ['192.0.2.1', '192.0.2.2'],
1727 'soa_edit_api': 'EPOCH',
1728 'soa_edit': 'EPOCH'
1729 }
1730 r = self.session.put(
1731 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1732 data=json.dumps(payload),
1733 headers={'content-type': 'application/json'})
1734 self.assert_success(r)
1735 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1736 for k in payload.keys():
1737 self.assertIn(k, data)
1738 self.assertEquals(data[k], payload[k])
1739 # update, back to Native and empty(off)
1740 payload = {
1741 'kind': 'Native',
1742 'soa_edit_api': '',
1743 'soa_edit': ''
1744 }
1745 r = self.session.put(
1746 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1747 data=json.dumps(payload),
1748 headers={'content-type': 'application/json'})
1749 self.assert_success(r)
1750 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1751 for k in payload.keys():
1752 self.assertIn(k, data)
1753 self.assertEquals(data[k], payload[k])
1754
1755
1756 @unittest.skipIf(not is_recursor(), "Not applicable")
1757 class RecursorZones(ApiTestCase):
1758
1759 def create_zone(self, name=None, kind=None, rd=False, servers=None):
1760 if name is None:
1761 name = unique_zone_name()
1762 if servers is None:
1763 servers = []
1764 payload = {
1765 'name': name,
1766 'kind': kind,
1767 'servers': servers,
1768 'recursion_desired': rd
1769 }
1770 r = self.session.post(
1771 self.url("/api/v1/servers/localhost/zones"),
1772 data=json.dumps(payload),
1773 headers={'content-type': 'application/json'})
1774 self.assert_success_json(r)
1775 return payload, r.json()
1776
1777 def test_create_auth_zone(self):
1778 payload, data = self.create_zone(kind='Native')
1779 for k in payload.keys():
1780 self.assertEquals(data[k], payload[k])
1781
1782 def test_create_zone_no_name(self):
1783 payload = {
1784 'name': '',
1785 'kind': 'Native',
1786 'servers': ['8.8.8.8'],
1787 'recursion_desired': False,
1788 }
1789 print(payload)
1790 r = self.session.post(
1791 self.url("/api/v1/servers/localhost/zones"),
1792 data=json.dumps(payload),
1793 headers={'content-type': 'application/json'})
1794 self.assertEquals(r.status_code, 422)
1795 self.assertIn('is not canonical', r.json()['error'])
1796
1797 def test_create_forwarded_zone(self):
1798 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
1799 # return values are normalized
1800 payload['servers'][0] += ':53'
1801 for k in payload.keys():
1802 self.assertEquals(data[k], payload[k])
1803
1804 def test_create_forwarded_rd_zone(self):
1805 payload, data = self.create_zone(name='google.com.', kind='Forwarded', rd=True, servers=['8.8.8.8'])
1806 # return values are normalized
1807 payload['servers'][0] += ':53'
1808 for k in payload.keys():
1809 self.assertEquals(data[k], payload[k])
1810
1811 def test_create_auth_zone_with_symbols(self):
1812 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
1813 expected_id = (payload['name'].replace('/', '=2F'))
1814 for k in payload.keys():
1815 self.assertEquals(data[k], payload[k])
1816 self.assertEquals(data['id'], expected_id)
1817
1818 def test_rename_auth_zone(self):
1819 payload, data = self.create_zone(kind='Native')
1820 name = payload['name']
1821 # now rename it
1822 payload = {
1823 'name': 'renamed-'+name,
1824 'kind': 'Native',
1825 'recursion_desired': False
1826 }
1827 r = self.session.put(
1828 self.url("/api/v1/servers/localhost/zones/" + name),
1829 data=json.dumps(payload),
1830 headers={'content-type': 'application/json'})
1831 self.assert_success(r)
1832 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + payload['name'])).json()
1833 for k in payload.keys():
1834 self.assertEquals(data[k], payload[k])
1835
1836 def test_zone_delete(self):
1837 payload, zone = self.create_zone(kind='Native')
1838 name = payload['name']
1839 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1840 self.assertEquals(r.status_code, 204)
1841 self.assertNotIn('Content-Type', r.headers)
1842
1843 def test_search_rr_exact_zone(self):
1844 name = unique_zone_name()
1845 self.create_zone(name=name, kind='Native')
1846 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
1847 self.assert_success_json(r)
1848 print(r.json())
1849 self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
1850
1851 def test_search_rr_substring(self):
1852 name = 'search-rr-zone.name.'
1853 self.create_zone(name=name, kind='Native')
1854 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
1855 self.assert_success_json(r)
1856 print(r.json())
1857 # should return zone, SOA
1858 self.assertEquals(len(r.json()), 2)
1859
1860 @unittest.skipIf(not is_auth(), "Not applicable")
1861 class AuthZoneKeys(ApiTestCase, AuthZonesHelperMixin):
1862
1863 def test_get_keys(self):
1864 r = self.session.get(
1865 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
1866 self.assert_success_json(r)
1867 keys = r.json()
1868 self.assertGreater(len(keys), 0)
1869
1870 key0 = deepcopy(keys[0])
1871 del key0['dnskey']
1872 del key0['ds']
1873 expected = {
1874 u'algorithm': u'ECDSAP256SHA256',
1875 u'bits': 256,
1876 u'active': True,
1877 u'type': u'Cryptokey',
1878 u'keytype': u'csk',
1879 u'flags': 257,
1880 u'id': 1}
1881 self.assertEquals(key0, expected)
1882
1883 keydata = keys[0]['dnskey'].split()
1884 self.assertEqual(len(keydata), 4)