]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
Merge pull request #6823 from klaus3000/load-ourserial-on-NOTIFY
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
1 from __future__ import print_function
2 import json
3 import time
4 import unittest
5 from copy import deepcopy
6 from pprint import pprint
7 from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, get_db_records, pdnsutil_rectify
8
9
10 def get_rrset(data, qname, qtype):
11 for rrset in data['rrsets']:
12 if rrset['name'] == qname and rrset['type'] == qtype:
13 return rrset
14 return None
15
16
17 def get_first_rec(data, qname, qtype):
18 rrset = get_rrset(data, qname, qtype)
19 if rrset:
20 return rrset['records'][0]
21 return None
22
23
24 def eq_zone_rrsets(rrsets, expected):
25 data_got = {}
26 data_expected = {}
27 for type_, expected_records in expected.items():
28 type_ = str(type_)
29 data_got[type_] = set()
30 data_expected[type_] = set()
31 uses_name = any(['name' in expected_record for expected_record in expected_records])
32 # minify + convert received data
33 for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
34 print(rrset)
35 for r in rrset['records']:
36 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
37 # minify expected data
38 for r in expected_records:
39 data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
40
41 print("eq_zone_rrsets: got:")
42 pprint(data_got)
43 print("eq_zone_rrsets: expected:")
44 pprint(data_expected)
45
46 assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
47
48
49 class Zones(ApiTestCase):
50
51 def test_list_zones(self):
52 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
53 self.assert_success_json(r)
54 domains = r.json()
55 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
56 self.assertEquals(len(example_com), 1)
57 example_com = example_com[0]
58 print(example_com)
59 required_fields = ['id', 'url', 'name', 'kind']
60 if is_auth():
61 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial', 'account']
62 self.assertNotEquals(example_com['serial'], 0)
63 elif is_recursor():
64 required_fields = required_fields + ['recursion_desired', 'servers']
65 for field in required_fields:
66 self.assertIn(field, example_com)
67
68
69 class AuthZonesHelperMixin(object):
70 def create_zone(self, name=None, **kwargs):
71 if name is None:
72 name = unique_zone_name()
73 payload = {
74 'name': name,
75 'kind': 'Native',
76 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
77 }
78 for k, v in kwargs.items():
79 if v is None:
80 del payload[k]
81 else:
82 payload[k] = v
83 print("sending", payload)
84 r = self.session.post(
85 self.url("/api/v1/servers/localhost/zones"),
86 data=json.dumps(payload),
87 headers={'content-type': 'application/json'})
88 self.assert_success_json(r)
89 self.assertEquals(r.status_code, 201)
90 reply = r.json()
91 print("reply", reply)
92 return name, payload, reply
93
94
95 @unittest.skipIf(not is_auth(), "Not applicable")
96 class AuthZones(ApiTestCase, AuthZonesHelperMixin):
97
98 def test_create_zone(self):
99 # soa_edit_api has a default, override with empty for this test
100 name, payload, data = self.create_zone(serial=22, soa_edit_api='')
101 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
102 self.assertIn(k, data)
103 if k in payload:
104 self.assertEquals(data[k], payload[k])
105 # validate generated SOA
106 expected_soa = "a.misconfigured.powerdns.server. hostmaster." + name + " " + \
107 str(payload['serial']) + " 10800 3600 604800 3600"
108 self.assertEquals(
109 get_first_rec(data, name, 'SOA')['content'],
110 expected_soa
111 )
112 # Because we had confusion about dots, check that the DB is without dots.
113 dbrecs = get_db_records(name, 'SOA')
114 self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
115
116 def test_create_zone_with_soa_edit_api(self):
117 # soa_edit_api wins over serial
118 name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
119 for k in ('soa_edit_api', ):
120 self.assertIn(k, data)
121 if k in payload:
122 self.assertEquals(data[k], payload[k])
123 # generated EPOCH serial surely is > fixed serial we passed in
124 print(data)
125 self.assertGreater(data['serial'], payload['serial'])
126 soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
127 self.assertGreater(soa_serial, payload['serial'])
128 self.assertEquals(soa_serial, data['serial'])
129
130 def test_create_zone_with_account(self):
131 # soa_edit_api wins over serial
132 name, payload, data = self.create_zone(account='anaccount', serial=10)
133 print(data)
134 for k in ('account', ):
135 self.assertIn(k, data)
136 if k in payload:
137 self.assertEquals(data[k], payload[k])
138
139 def test_create_zone_default_soa_edit_api(self):
140 name, payload, data = self.create_zone()
141 print(data)
142 self.assertEquals(data['soa_edit_api'], 'DEFAULT')
143
144 def test_create_zone_exists(self):
145 name, payload, data = self.create_zone()
146 print(data)
147 payload = {
148 'name': name,
149 'kind': 'Native'
150 }
151 print(payload)
152 r = self.session.post(
153 self.url("/api/v1/servers/localhost/zones"),
154 data=json.dumps(payload),
155 headers={'content-type': 'application/json'})
156 self.assertEquals(r.status_code, 409) # Conflict - already exists
157
158 def test_create_zone_with_soa_edit(self):
159 name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
160 print(data)
161 self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT')
162 self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
163 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
164 # These particular settings lead to the first serial set to YYYYMMDD01.
165 self.assertEquals(soa_serial[-2:], '01')
166 rrset = {
167 'changetype': 'replace',
168 'name': name,
169 'type': 'A',
170 'ttl': 3600,
171 'records': [
172 {
173 "content": "127.0.0.1",
174 "disabled": False
175 }
176 ]
177 }
178 payload = {'rrsets': [rrset]}
179 self.session.patch(
180 self.url("/api/v1/servers/localhost/zones/" + data['id']),
181 data=json.dumps(payload),
182 headers={'content-type': 'application/json'})
183 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
184 data = r.json()
185 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
186 self.assertEquals(soa_serial[-2:], '02')
187
188 def test_create_zone_with_records(self):
189 name = unique_zone_name()
190 rrset = {
191 "name": name,
192 "type": "A",
193 "ttl": 3600,
194 "records": [{
195 "content": "4.3.2.1",
196 "disabled": False,
197 }],
198 }
199 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
200 # check our record has appeared
201 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
202
203 def test_create_zone_with_wildcard_records(self):
204 name = unique_zone_name()
205 rrset = {
206 "name": "*."+name,
207 "type": "A",
208 "ttl": 3600,
209 "records": [{
210 "content": "4.3.2.1",
211 "disabled": False,
212 }],
213 }
214 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
215 # check our record has appeared
216 self.assertEquals(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
217
218 def test_create_zone_with_comments(self):
219 name = unique_zone_name()
220 rrsets = [
221 {
222 "name": name,
223 "type": "soa", # test uppercasing of type, too.
224 "comments": [{
225 "account": "test1",
226 "content": "blah blah",
227 "modified_at": 11112,
228 }],
229 },
230 {
231 "name": name,
232 "type": "AAAA",
233 "ttl": 3600,
234 "records": [{
235 "content": "2001:DB8::1",
236 "disabled": False,
237 }],
238 "comments": [{
239 "account": "test AAAA",
240 "content": "blah blah AAAA",
241 "modified_at": 11112,
242 }],
243 },
244 {
245 "name": name,
246 "type": "TXT",
247 "ttl": 3600,
248 "records": [{
249 "content": "\"test TXT\"",
250 "disabled": False,
251 }],
252 },
253 {
254 "name": name,
255 "type": "A",
256 "ttl": 3600,
257 "records": [{
258 "content": "192.0.2.1",
259 "disabled": False,
260 }],
261 },
262 ]
263 name, payload, data = self.create_zone(name=name, rrsets=rrsets)
264 # NS records have been created
265 self.assertEquals(len(data['rrsets']), len(rrsets) + 1)
266 # check our comment has appeared
267 self.assertEquals(get_rrset(data, name, 'SOA')['comments'], rrsets[0]['comments'])
268 self.assertEquals(get_rrset(data, name, 'A')['comments'], [])
269 self.assertEquals(get_rrset(data, name, 'TXT')['comments'], [])
270 self.assertEquals(get_rrset(data, name, 'AAAA')['comments'], rrsets[1]['comments'])
271
272 def test_create_zone_uncanonical_nameservers(self):
273 name = unique_zone_name()
274 payload = {
275 'name': name,
276 'kind': 'Native',
277 'nameservers': ['uncanon.example.com']
278 }
279 print(payload)
280 r = self.session.post(
281 self.url("/api/v1/servers/localhost/zones"),
282 data=json.dumps(payload),
283 headers={'content-type': 'application/json'})
284 self.assertEquals(r.status_code, 422)
285 self.assertIn('Nameserver is not canonical', r.json()['error'])
286
287 def test_create_auth_zone_no_name(self):
288 name = unique_zone_name()
289 payload = {
290 'name': '',
291 'kind': 'Native',
292 }
293 print(payload)
294 r = self.session.post(
295 self.url("/api/v1/servers/localhost/zones"),
296 data=json.dumps(payload),
297 headers={'content-type': 'application/json'})
298 self.assertEquals(r.status_code, 422)
299 self.assertIn('is not canonical', r.json()['error'])
300
301 def test_create_zone_with_custom_soa(self):
302 name = unique_zone_name()
303 content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
304 rrset = {
305 "name": name,
306 "type": "soa", # test uppercasing of type, too.
307 "ttl": 3600,
308 "records": [{
309 "content": content,
310 "disabled": False,
311 }],
312 }
313 name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
314 self.assertEquals(get_rrset(data, name, 'SOA')['records'], rrset['records'])
315 dbrecs = get_db_records(name, 'SOA')
316 self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
317
318 def test_create_zone_double_dot(self):
319 name = 'test..' + unique_zone_name()
320 payload = {
321 'name': name,
322 'kind': 'Native',
323 'nameservers': ['ns1.example.com.']
324 }
325 print(payload)
326 r = self.session.post(
327 self.url("/api/v1/servers/localhost/zones"),
328 data=json.dumps(payload),
329 headers={'content-type': 'application/json'})
330 self.assertEquals(r.status_code, 422)
331 self.assertIn('Unable to parse DNS Name', r.json()['error'])
332
333 def test_create_zone_restricted_chars(self):
334 name = 'test:' + unique_zone_name() # : isn't good as a name.
335 payload = {
336 'name': name,
337 'kind': 'Native',
338 'nameservers': ['ns1.example.com']
339 }
340 print(payload)
341 r = self.session.post(
342 self.url("/api/v1/servers/localhost/zones"),
343 data=json.dumps(payload),
344 headers={'content-type': 'application/json'})
345 self.assertEquals(r.status_code, 422)
346 self.assertIn('contains unsupported characters', r.json()['error'])
347
348 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self):
349 name = unique_zone_name()
350 rrset = {
351 "name": name,
352 "type": "NS",
353 "ttl": 3600,
354 "records": [{
355 "content": "ns2.example.com.",
356 "disabled": False,
357 }],
358 }
359 payload = {
360 'name': name,
361 'kind': 'Native',
362 'nameservers': ['ns1.example.com.'],
363 'rrsets': [rrset],
364 }
365 print(payload)
366 r = self.session.post(
367 self.url("/api/v1/servers/localhost/zones"),
368 data=json.dumps(payload),
369 headers={'content-type': 'application/json'})
370 self.assertEquals(r.status_code, 422)
371 self.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r.json()['error'])
372
373 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self):
374 name = unique_zone_name()
375 rrset = {
376 "name": 'subzone.'+name,
377 "type": "NS",
378 "ttl": 3600,
379 "records": [{
380 "content": "ns2.example.com.",
381 "disabled": False,
382 }],
383 }
384 payload = {
385 'name': name,
386 'kind': 'Native',
387 'nameservers': ['ns1.example.com.'],
388 'rrsets': [rrset],
389 }
390 print(payload)
391 r = self.session.post(
392 self.url("/api/v1/servers/localhost/zones"),
393 data=json.dumps(payload),
394 headers={'content-type': 'application/json'})
395 self.assert_success_json(r)
396
397 def test_create_zone_with_symbols(self):
398 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
399 name = payload['name']
400 expected_id = name.replace('/', '=2F')
401 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
402 self.assertIn(k, data)
403 if k in payload:
404 self.assertEquals(data[k], payload[k])
405 self.assertEquals(data['id'], expected_id)
406 dbrecs = get_db_records(name, 'SOA')
407 self.assertEqual(dbrecs[0]['name'], name.rstrip('.'))
408
409 def test_create_zone_with_nameservers_non_string(self):
410 # ensure we don't crash
411 name = unique_zone_name()
412 payload = {
413 'name': name,
414 'kind': 'Native',
415 'nameservers': [{'a': 'ns1.example.com'}] # invalid
416 }
417 print(payload)
418 r = self.session.post(
419 self.url("/api/v1/servers/localhost/zones"),
420 data=json.dumps(payload),
421 headers={'content-type': 'application/json'})
422 self.assertEquals(r.status_code, 422)
423
424 def test_create_zone_with_dnssec(self):
425 """
426 Create a zone with "dnssec" set and see if a key was made.
427 """
428 name = unique_zone_name()
429 name, payload, data = self.create_zone(dnssec=True)
430
431 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
432
433 for k in ('dnssec', ):
434 self.assertIn(k, data)
435 if k in payload:
436 self.assertEquals(data[k], payload[k])
437
438 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
439
440 keys = r.json()
441
442 print(keys)
443
444 self.assertEquals(r.status_code, 200)
445 self.assertEquals(len(keys), 1)
446 self.assertEquals(keys[0]['type'], 'Cryptokey')
447 self.assertEquals(keys[0]['active'], True)
448 self.assertEquals(keys[0]['keytype'], 'csk')
449
450 def test_create_zone_with_dnssec_disable_dnssec(self):
451 """
452 Create a zone with "dnssec", then set "dnssec" to false and see if the
453 keys are gone
454 """
455 name = unique_zone_name()
456 name, payload, data = self.create_zone(dnssec=True)
457
458 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
459 data=json.dumps({'dnssec': False}))
460 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
461
462 zoneinfo = r.json()
463
464 self.assertEquals(r.status_code, 200)
465 self.assertEquals(zoneinfo['dnssec'], False)
466
467 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
468
469 keys = r.json()
470
471 self.assertEquals(r.status_code, 200)
472 self.assertEquals(len(keys), 0)
473
474 def test_create_zone_with_nsec3param(self):
475 """
476 Create a zone with "nsec3param" set and see if the metadata was added.
477 """
478 name = unique_zone_name()
479 nsec3param = '1 0 500 aabbccddeeff'
480 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
481
482 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
483
484 for k in ('dnssec', 'nsec3param'):
485 self.assertIn(k, data)
486 if k in payload:
487 self.assertEquals(data[k], payload[k])
488
489 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3PARAM'))
490
491 data = r.json()
492
493 print(data)
494
495 self.assertEquals(r.status_code, 200)
496 self.assertEquals(len(data['metadata']), 1)
497 self.assertEquals(data['kind'], 'NSEC3PARAM')
498 self.assertEquals(data['metadata'][0], nsec3param)
499
500 def test_create_zone_with_nsec3narrow(self):
501 """
502 Create a zone with "nsec3narrow" set and see if the metadata was added.
503 """
504 name = unique_zone_name()
505 nsec3param = '1 0 500 aabbccddeeff'
506 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
507 nsec3narrow=True)
508
509 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
510
511 for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
512 self.assertIn(k, data)
513 if k in payload:
514 self.assertEquals(data[k], payload[k])
515
516 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3NARROW'))
517
518 data = r.json()
519
520 print(data)
521
522 self.assertEquals(r.status_code, 200)
523 self.assertEquals(len(data['metadata']), 1)
524 self.assertEquals(data['kind'], 'NSEC3NARROW')
525 self.assertEquals(data['metadata'][0], '1')
526
527 def test_create_zone_dnssec_serial(self):
528 """
529 Create a zone set/unset "dnssec" and see if the serial was increased
530 after every step
531 """
532 name = unique_zone_name()
533 name, payload, data = self.create_zone()
534
535 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
536 self.assertEquals(soa_serial[-2:], '01')
537
538 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
539 data=json.dumps({'dnssec': True}))
540 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
541
542 data = r.json()
543 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
544
545 self.assertEquals(r.status_code, 200)
546 self.assertEquals(soa_serial[-2:], '02')
547
548 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
549 data=json.dumps({'dnssec': False}))
550 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
551
552 data = r.json()
553 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
554
555 self.assertEquals(r.status_code, 200)
556 self.assertEquals(soa_serial[-2:], '03')
557
558 def test_zone_absolute_url(self):
559 name, payload, data = self.create_zone()
560 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
561 rdata = r.json()
562 print(rdata[0])
563 self.assertTrue(rdata[0]['url'].startswith('/api/v'))
564
565 def test_create_zone_metadata(self):
566 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
567 r = self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
568 data=json.dumps(payload_metadata))
569 rdata = r.json()
570 self.assertEquals(r.status_code, 201)
571 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
572
573 def test_create_zone_metadata_kind(self):
574 payload_metadata = {"metadata": ["127.0.0.2"]}
575 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
576 data=json.dumps(payload_metadata))
577 rdata = r.json()
578 self.assertEquals(r.status_code, 200)
579 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
580
581 def test_create_protected_zone_metadata(self):
582 # test whether it prevents modification of certain kinds
583 for k in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
584 payload = {"metadata": ["FOO", "BAR"]}
585 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k),
586 data=json.dumps(payload))
587 self.assertEquals(r.status_code, 422)
588
589 def test_retrieve_zone_metadata(self):
590 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
591 self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
592 data=json.dumps(payload_metadata))
593 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata"))
594 rdata = r.json()
595 self.assertEquals(r.status_code, 200)
596 self.assertIn(payload_metadata, rdata)
597
598 def test_delete_zone_metadata(self):
599 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
600 self.assertEquals(r.status_code, 200)
601 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
602 rdata = r.json()
603 self.assertEquals(r.status_code, 200)
604 self.assertEquals(rdata["metadata"], [])
605
606 def test_create_external_zone_metadata(self):
607 payload_metadata = {"metadata": ["My very important message"]}
608 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
609 data=json.dumps(payload_metadata))
610 self.assertEquals(r.status_code, 200)
611 rdata = r.json()
612 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
613
614 def test_create_metadata_in_non_existent_zone(self):
615 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
616 r = self.session.post(self.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
617 data=json.dumps(payload_metadata))
618 self.assertEquals(r.status_code, 404)
619 # Note: errors should probably contain json (see #5988)
620 # self.assertIn('Could not find domain ', r.json()['error'])
621
622 def test_create_slave_zone(self):
623 # Test that nameservers can be absent for slave zones.
624 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
625 for k in ('name', 'masters', 'kind'):
626 self.assertIn(k, data)
627 self.assertEquals(data[k], payload[k])
628 print("payload:", payload)
629 print("data:", data)
630 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
631 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
632 zonelist = r.json()
633 print("zonelist:", zonelist)
634 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
635 # Also test that fetching the zone works.
636 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
637 data = r.json()
638 print("zone (fetched):", data)
639 for k in ('name', 'masters', 'kind'):
640 self.assertIn(k, data)
641 self.assertEquals(data[k], payload[k])
642 self.assertEqual(data['serial'], 0)
643 self.assertEqual(data['rrsets'], [])
644
645 def test_find_zone_by_name(self):
646 name = 'foo/' + unique_zone_name()
647 name, payload, data = self.create_zone(name=name)
648 r = self.session.get(self.url("/api/v1/servers/localhost/zones?zone=" + name))
649 data = r.json()
650 print(data)
651 self.assertEquals(data[0]['name'], name)
652
653 def test_delete_slave_zone(self):
654 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
655 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
656 r.raise_for_status()
657
658 def test_retrieve_slave_zone(self):
659 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
660 print("payload:", payload)
661 print("data:", data)
662 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
663 data = r.json()
664 print("status for axfr-retrieve:", data)
665 self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
666 '\' from master 127.0.0.2')
667
668 def test_notify_master_zone(self):
669 name, payload, data = self.create_zone(kind='Master')
670 print("payload:", payload)
671 print("data:", data)
672 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
673 data = r.json()
674 print("status for notify:", data)
675 self.assertEqual(data['result'], 'Notification queued')
676
677 def test_get_zone_with_symbols(self):
678 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
679 name = payload['name']
680 zone_id = (name.replace('/', '=2F'))
681 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
682 data = r.json()
683 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
684 self.assertIn(k, data)
685 if k in payload:
686 self.assertEquals(data[k], payload[k])
687
688 def test_get_zone(self):
689 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
690 domains = r.json()
691 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
692 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id']))
693 self.assert_success_json(r)
694 data = r.json()
695 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
696 self.assertIn(k, data)
697 self.assertEquals(data['name'], 'example.com.')
698
699 def test_import_zone_broken(self):
700 payload = {}
701 payload['zone'] = """
702 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
703 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
704 ;; WARNING: recursion requested but not available
705
706 ;; OPT PSEUDOSECTION:
707 ; EDNS: version: 0, flags:; udp: 1680
708 ;; QUESTION SECTION:
709 ;powerdns.com. IN SOA
710
711 ;; ANSWER SECTION:
712 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
713 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
714 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
715 powerdns-broken.com. 86400 IN A 82.94.213.34
716 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
717 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
718 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
719 """
720 payload['name'] = 'powerdns-broken.com.'
721 payload['kind'] = 'Master'
722 payload['nameservers'] = []
723 r = self.session.post(
724 self.url("/api/v1/servers/localhost/zones"),
725 data=json.dumps(payload),
726 headers={'content-type': 'application/json'})
727 self.assertEquals(r.status_code, 422)
728
729 def test_import_zone_axfr_outofzone(self):
730 # Ensure we don't create out-of-zone records
731 name = unique_zone_name()
732 payload = {}
733 payload['zone'] = """
734 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
735 NAME 3600 IN NS powerdnssec2.ds9a.nl.
736 example.org. 3600 IN AAAA 2001:888:2000:1d::2
737 NAME 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
738 """.replace('NAME', name)
739 payload['name'] = name
740 payload['kind'] = 'Master'
741 payload['nameservers'] = []
742 r = self.session.post(
743 self.url("/api/v1/servers/localhost/zones"),
744 data=json.dumps(payload),
745 headers={'content-type': 'application/json'})
746 self.assertEquals(r.status_code, 422)
747 self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
748
749 def test_import_zone_axfr(self):
750 payload = {}
751 payload['zone'] = """
752 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
753 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
754 ;; WARNING: recursion requested but not available
755
756 ;; OPT PSEUDOSECTION:
757 ; EDNS: version: 0, flags:; udp: 1680
758 ;; QUESTION SECTION:
759 ;powerdns.com. IN SOA
760
761 ;; ANSWER SECTION:
762 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
763 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
764 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
765 powerdns.com. 86400 IN A 82.94.213.34
766 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
767 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
768 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
769 """
770 payload['name'] = 'powerdns.com.'
771 payload['kind'] = 'Master'
772 payload['nameservers'] = []
773 payload['soa_edit_api'] = '' # turn off so exact SOA comparison works.
774 r = self.session.post(
775 self.url("/api/v1/servers/localhost/zones"),
776 data=json.dumps(payload),
777 headers={'content-type': 'application/json'})
778 self.assert_success_json(r)
779 data = r.json()
780 self.assertIn('name', data)
781
782 expected = {
783 'NS': [
784 {'content': 'powerdnssec1.ds9a.nl.'},
785 {'content': 'powerdnssec2.ds9a.nl.'},
786 ],
787 'SOA': [
788 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
789 ],
790 'MX': [
791 {'content': '0 xs.powerdns.com.'},
792 ],
793 'A': [
794 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
795 ],
796 'AAAA': [
797 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
798 ],
799 }
800
801 eq_zone_rrsets(data['rrsets'], expected)
802
803 # check content in DB is stored WITHOUT trailing dot.
804 dbrecs = get_db_records(payload['name'], 'NS')
805 dbrec = next((dbrec for dbrec in dbrecs if dbrec['content'].startswith('powerdnssec1')))
806 self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
807
808 def test_import_zone_bind(self):
809 payload = {}
810 payload['zone'] = """
811 $TTL 86400 ; 24 hours could have been written as 24h or 1d
812 ; $TTL used for all RRs without explicit TTL value
813 $ORIGIN example.org.
814 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
815 2002022401 ; serial
816 3H ; refresh
817 15 ; retry
818 1w ; expire
819 3h ; minimum
820 )
821 IN NS ns1.example.org. ; in the domain
822 IN NS ns2.smokeyjoe.com. ; external to domain
823 IN MX 10 mail.another.com. ; external mail provider
824 ; server host definitions
825 ns1 IN A 192.168.0.1 ;name server definition
826 www IN A 192.168.0.2 ;web server definition
827 ftp IN CNAME www.example.org. ;ftp server definition
828 ; non server domain hosts
829 bill IN A 192.168.0.3
830 fred IN A 192.168.0.4
831 """
832 payload['name'] = 'example.org.'
833 payload['kind'] = 'Master'
834 payload['nameservers'] = []
835 payload['soa_edit_api'] = '' # turn off so exact SOA comparison works.
836 r = self.session.post(
837 self.url("/api/v1/servers/localhost/zones"),
838 data=json.dumps(payload),
839 headers={'content-type': 'application/json'})
840 self.assert_success_json(r)
841 data = r.json()
842 self.assertIn('name', data)
843
844 expected = {
845 'NS': [
846 {'content': 'ns1.example.org.'},
847 {'content': 'ns2.smokeyjoe.com.'},
848 ],
849 'SOA': [
850 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
851 ],
852 'MX': [
853 {'content': '10 mail.another.com.'},
854 ],
855 'A': [
856 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
857 {'content': '192.168.0.2', 'name': 'www.example.org.'},
858 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
859 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
860 ],
861 'CNAME': [
862 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
863 ],
864 }
865
866 eq_zone_rrsets(data['rrsets'], expected)
867
868 def test_export_zone_json(self):
869 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
870 # export it
871 r = self.session.get(
872 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
873 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
874 )
875 self.assert_success_json(r)
876 data = r.json()
877 self.assertIn('zone', data)
878 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
879 name + '\t3600\tIN\tNS\tns2.foo.com.',
880 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
881 ' 0 10800 3600 604800 3600']
882 self.assertEquals(data['zone'].strip().split('\n'), expected_data)
883
884 def test_export_zone_text(self):
885 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
886 # export it
887 r = self.session.get(
888 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
889 headers={'accept': '*/*'}
890 )
891 data = r.text.strip().split("\n")
892 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
893 name + '\t3600\tIN\tNS\tns2.foo.com.',
894 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
895 ' 0 10800 3600 604800 3600']
896 self.assertEquals(data, expected_data)
897
898 def test_update_zone(self):
899 name, payload, zone = self.create_zone()
900 name = payload['name']
901 # update, set as Master and enable SOA-EDIT-API
902 payload = {
903 'kind': 'Master',
904 'masters': ['192.0.2.1', '192.0.2.2'],
905 'soa_edit_api': 'EPOCH',
906 'soa_edit': 'EPOCH'
907 }
908 r = self.session.put(
909 self.url("/api/v1/servers/localhost/zones/" + name),
910 data=json.dumps(payload),
911 headers={'content-type': 'application/json'})
912 self.assert_success(r)
913 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
914 for k in payload.keys():
915 self.assertIn(k, data)
916 self.assertEquals(data[k], payload[k])
917 # update, back to Native and empty(off)
918 payload = {
919 'kind': 'Native',
920 'soa_edit_api': '',
921 'soa_edit': ''
922 }
923 r = self.session.put(
924 self.url("/api/v1/servers/localhost/zones/" + name),
925 data=json.dumps(payload),
926 headers={'content-type': 'application/json'})
927 self.assert_success(r)
928 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
929 for k in payload.keys():
930 self.assertIn(k, data)
931 self.assertEquals(data[k], payload[k])
932
933 def test_zone_rr_update(self):
934 name, payload, zone = self.create_zone()
935 # do a replace (= update)
936 rrset = {
937 'changetype': 'replace',
938 'name': name,
939 'type': 'ns',
940 'ttl': 3600,
941 'records': [
942 {
943 "content": "ns1.bar.com.",
944 "disabled": False
945 },
946 {
947 "content": "ns2-disabled.bar.com.",
948 "disabled": True
949 }
950 ]
951 }
952 payload = {'rrsets': [rrset]}
953 r = self.session.patch(
954 self.url("/api/v1/servers/localhost/zones/" + name),
955 data=json.dumps(payload),
956 headers={'content-type': 'application/json'})
957 self.assert_success(r)
958 # verify that (only) the new record is there
959 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
960 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset['records'])
961
962 def test_zone_rr_update_mx(self):
963 # Important to test with MX records, as they have a priority field, which must end up in the content field.
964 name, payload, zone = self.create_zone()
965 # do a replace (= update)
966 rrset = {
967 'changetype': 'replace',
968 'name': name,
969 'type': 'MX',
970 'ttl': 3600,
971 'records': [
972 {
973 "content": "10 mail.example.org.",
974 "disabled": False
975 }
976 ]
977 }
978 payload = {'rrsets': [rrset]}
979 r = self.session.patch(
980 self.url("/api/v1/servers/localhost/zones/" + name),
981 data=json.dumps(payload),
982 headers={'content-type': 'application/json'})
983 self.assert_success(r)
984 # verify that (only) the new record is there
985 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
986 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset['records'])
987
988 def test_zone_rr_update_invalid_mx(self):
989 name, payload, zone = self.create_zone()
990 # do a replace (= update)
991 rrset = {
992 'changetype': 'replace',
993 'name': name,
994 'type': 'MX',
995 'ttl': 3600,
996 'records': [
997 {
998 "content": "10 mail@mx.example.org.",
999 "disabled": False
1000 }
1001 ]
1002 }
1003 payload = {'rrsets': [rrset]}
1004 r = self.session.patch(
1005 self.url("/api/v1/servers/localhost/zones/" + name),
1006 data=json.dumps(payload),
1007 headers={'content-type': 'application/json'})
1008 self.assertEquals(r.status_code, 422)
1009 self.assertIn('non-hostname content', r.json()['error'])
1010 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1011 self.assertIsNone(get_rrset(data, name, 'MX'))
1012
1013 def test_zone_rr_update_opt(self):
1014 name, payload, zone = self.create_zone()
1015 # do a replace (= update)
1016 rrset = {
1017 'changetype': 'replace',
1018 'name': name,
1019 'type': 'OPT',
1020 'ttl': 3600,
1021 'records': [
1022 {
1023 "content": "9",
1024 "disabled": False
1025 }
1026 ]
1027 }
1028 payload = {'rrsets': [rrset]}
1029 r = self.session.patch(
1030 self.url("/api/v1/servers/localhost/zones/" + name),
1031 data=json.dumps(payload),
1032 headers={'content-type': 'application/json'})
1033 self.assertEquals(r.status_code, 422)
1034 self.assertIn('OPT: invalid type given', r.json()['error'])
1035
1036 def test_zone_rr_update_multiple_rrsets(self):
1037 name, payload, zone = self.create_zone()
1038 rrset1 = {
1039 'changetype': 'replace',
1040 'name': name,
1041 'type': 'NS',
1042 'ttl': 3600,
1043 'records': [
1044 {
1045
1046 "content": "ns9999.example.com.",
1047 "disabled": False
1048 }
1049 ]
1050 }
1051 rrset2 = {
1052 'changetype': 'replace',
1053 'name': name,
1054 'type': 'MX',
1055 'ttl': 3600,
1056 'records': [
1057 {
1058 "content": "10 mx444.example.com.",
1059 "disabled": False
1060 }
1061 ]
1062 }
1063 payload = {'rrsets': [rrset1, rrset2]}
1064 r = self.session.patch(
1065 self.url("/api/v1/servers/localhost/zones/" + name),
1066 data=json.dumps(payload),
1067 headers={'content-type': 'application/json'})
1068 self.assert_success(r)
1069 # verify that all rrsets have been updated
1070 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1071 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset1['records'])
1072 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset2['records'])
1073
1074 def test_zone_rr_update_duplicate_record(self):
1075 name, payload, zone = self.create_zone()
1076 rrset = {
1077 'changetype': 'replace',
1078 'name': name,
1079 'type': 'NS',
1080 'ttl': 3600,
1081 'records': [
1082 {"content": "ns9999.example.com.", "disabled": False},
1083 {"content": "ns9996.example.com.", "disabled": False},
1084 {"content": "ns9987.example.com.", "disabled": False},
1085 {"content": "ns9988.example.com.", "disabled": False},
1086 {"content": "ns9999.example.com.", "disabled": False},
1087 ]
1088 }
1089 payload = {'rrsets': [rrset]}
1090 r = self.session.patch(
1091 self.url("/api/v1/servers/localhost/zones/" + name),
1092 data=json.dumps(payload),
1093 headers={'content-type': 'application/json'})
1094 self.assertEquals(r.status_code, 422)
1095 self.assertIn('Duplicate record in RRset', r.json()['error'])
1096
1097 def test_zone_rr_update_duplicate_rrset(self):
1098 name, payload, zone = self.create_zone()
1099 rrset1 = {
1100 'changetype': 'replace',
1101 'name': name,
1102 'type': 'NS',
1103 'ttl': 3600,
1104 'records': [
1105 {
1106 "content": "ns9999.example.com.",
1107 "disabled": False
1108 }
1109 ]
1110 }
1111 rrset2 = {
1112 'changetype': 'replace',
1113 'name': name,
1114 'type': 'NS',
1115 'ttl': 3600,
1116 'records': [
1117 {
1118 "content": "ns9998.example.com.",
1119 "disabled": False
1120 }
1121 ]
1122 }
1123 payload = {'rrsets': [rrset1, rrset2]}
1124 r = self.session.patch(
1125 self.url("/api/v1/servers/localhost/zones/" + name),
1126 data=json.dumps(payload),
1127 headers={'content-type': 'application/json'})
1128 self.assertEquals(r.status_code, 422)
1129 self.assertIn('Duplicate RRset', r.json()['error'])
1130
1131 def test_zone_rr_delete(self):
1132 name, payload, zone = self.create_zone()
1133 # do a delete of all NS records (these are created with the zone)
1134 rrset = {
1135 'changetype': 'delete',
1136 'name': name,
1137 'type': 'NS'
1138 }
1139 payload = {'rrsets': [rrset]}
1140 r = self.session.patch(
1141 self.url("/api/v1/servers/localhost/zones/" + name),
1142 data=json.dumps(payload),
1143 headers={'content-type': 'application/json'})
1144 self.assert_success(r)
1145 # verify that the records are gone
1146 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1147 self.assertIsNone(get_rrset(data, name, 'NS'))
1148
1149 def test_zone_disable_reenable(self):
1150 # This also tests that SOA-EDIT-API works.
1151 name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
1152 # disable zone by disabling SOA
1153 rrset = {
1154 'changetype': 'replace',
1155 'name': name,
1156 'type': 'SOA',
1157 'ttl': 3600,
1158 'records': [
1159 {
1160 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1161 "disabled": True
1162 }
1163 ]
1164 }
1165 payload = {'rrsets': [rrset]}
1166 r = self.session.patch(
1167 self.url("/api/v1/servers/localhost/zones/" + name),
1168 data=json.dumps(payload),
1169 headers={'content-type': 'application/json'})
1170 self.assert_success(r)
1171 # check SOA serial has been edited
1172 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1173 soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1174 self.assertNotEquals(soa_serial1, '1')
1175 # make sure domain is still in zone list (disabled SOA!)
1176 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
1177 domains = r.json()
1178 self.assertEquals(len([domain for domain in domains if domain['name'] == name]), 1)
1179 # sleep 1sec to ensure the EPOCH value changes for the next request
1180 time.sleep(1)
1181 # verify that modifying it still works
1182 rrset['records'][0]['disabled'] = False
1183 payload = {'rrsets': [rrset]}
1184 r = self.session.patch(
1185 self.url("/api/v1/servers/localhost/zones/" + name),
1186 data=json.dumps(payload),
1187 headers={'content-type': 'application/json'})
1188 self.assert_success(r)
1189 # check SOA serial has been edited again
1190 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1191 soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1192 self.assertNotEquals(soa_serial2, '1')
1193 self.assertNotEquals(soa_serial2, soa_serial1)
1194
1195 def test_zone_rr_update_out_of_zone(self):
1196 name, payload, zone = self.create_zone()
1197 # replace with qname mismatch
1198 rrset = {
1199 'changetype': 'replace',
1200 'name': 'not-in-zone.',
1201 'type': 'NS',
1202 'ttl': 3600,
1203 'records': [
1204 {
1205 "content": "ns1.bar.com.",
1206 "disabled": False
1207 }
1208 ]
1209 }
1210 payload = {'rrsets': [rrset]}
1211 r = self.session.patch(
1212 self.url("/api/v1/servers/localhost/zones/" + name),
1213 data=json.dumps(payload),
1214 headers={'content-type': 'application/json'})
1215 self.assertEquals(r.status_code, 422)
1216 self.assertIn('out of zone', r.json()['error'])
1217
1218 def test_zone_rr_update_restricted_chars(self):
1219 name, payload, zone = self.create_zone()
1220 # replace with qname mismatch
1221 rrset = {
1222 'changetype': 'replace',
1223 'name': 'test:' + name,
1224 'type': 'NS',
1225 'ttl': 3600,
1226 'records': [
1227 {
1228 "content": "ns1.bar.com.",
1229 "disabled": False
1230 }
1231 ]
1232 }
1233 payload = {'rrsets': [rrset]}
1234 r = self.session.patch(
1235 self.url("/api/v1/servers/localhost/zones/" + name),
1236 data=json.dumps(payload),
1237 headers={'content-type': 'application/json'})
1238 self.assertEquals(r.status_code, 422)
1239 self.assertIn('contains unsupported characters', r.json()['error'])
1240
1241 def test_rrset_unknown_type(self):
1242 name, payload, zone = self.create_zone()
1243 rrset = {
1244 'changetype': 'replace',
1245 'name': name,
1246 'type': 'FAFAFA',
1247 'ttl': 3600,
1248 'records': [
1249 {
1250 "content": "4.3.2.1",
1251 "disabled": False
1252 }
1253 ]
1254 }
1255 payload = {'rrsets': [rrset]}
1256 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1257 headers={'content-type': 'application/json'})
1258 self.assertEquals(r.status_code, 422)
1259 self.assertIn('unknown type', r.json()['error'])
1260
1261 def test_rrset_cname_and_other(self):
1262 name, payload, zone = self.create_zone()
1263 rrset = {
1264 'changetype': 'replace',
1265 'name': name,
1266 'type': 'CNAME',
1267 'ttl': 3600,
1268 'records': [
1269 {
1270 "content": "example.org.",
1271 "disabled": False
1272 }
1273 ]
1274 }
1275 payload = {'rrsets': [rrset]}
1276 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1277 headers={'content-type': 'application/json'})
1278 self.assertEquals(r.status_code, 422)
1279 self.assertIn('Conflicts with pre-existing non-CNAME RRset', r.json()['error'])
1280
1281 def test_rrset_other_and_cname(self):
1282 name, payload, zone = self.create_zone()
1283 rrset = {
1284 'changetype': 'replace',
1285 'name': 'sub.'+name,
1286 'type': 'CNAME',
1287 'ttl': 3600,
1288 'records': [
1289 {
1290 "content": "example.org.",
1291 "disabled": False
1292 }
1293 ]
1294 }
1295 payload = {'rrsets': [rrset]}
1296 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1297 headers={'content-type': 'application/json'})
1298 self.assert_success(r)
1299 rrset = {
1300 'changetype': 'replace',
1301 'name': 'sub.'+name,
1302 'type': 'A',
1303 'ttl': 3600,
1304 'records': [
1305 {
1306 "content": "1.2.3.4",
1307 "disabled": False
1308 }
1309 ]
1310 }
1311 payload = {'rrsets': [rrset]}
1312 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1313 headers={'content-type': 'application/json'})
1314 self.assertEquals(r.status_code, 422)
1315 self.assertIn('Conflicts with pre-existing CNAME RRset', r.json()['error'])
1316
1317 def test_create_zone_with_leading_space(self):
1318 # Actual regression.
1319 name, payload, zone = self.create_zone()
1320 rrset = {
1321 'changetype': 'replace',
1322 'name': name,
1323 'type': 'A',
1324 'ttl': 3600,
1325 'records': [
1326 {
1327 "content": " 4.3.2.1",
1328 "disabled": False
1329 }
1330 ]
1331 }
1332 payload = {'rrsets': [rrset]}
1333 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1334 headers={'content-type': 'application/json'})
1335 self.assertEquals(r.status_code, 422)
1336 self.assertIn('Not in expected format', r.json()['error'])
1337
1338 def test_zone_rr_delete_out_of_zone(self):
1339 name, payload, zone = self.create_zone()
1340 rrset = {
1341 'changetype': 'delete',
1342 'name': 'not-in-zone.',
1343 'type': 'NS'
1344 }
1345 payload = {'rrsets': [rrset]}
1346 r = self.session.patch(
1347 self.url("/api/v1/servers/localhost/zones/" + name),
1348 data=json.dumps(payload),
1349 headers={'content-type': 'application/json'})
1350 print(r.content)
1351 self.assert_success(r) # succeed so users can fix their wrong, old data
1352
1353 def test_zone_delete(self):
1354 name, payload, zone = self.create_zone()
1355 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1356 self.assertEquals(r.status_code, 204)
1357 self.assertNotIn('Content-Type', r.headers)
1358
1359 def test_zone_comment_create(self):
1360 name, payload, zone = self.create_zone()
1361 rrset = {
1362 'changetype': 'replace',
1363 'name': name,
1364 'type': 'NS',
1365 'ttl': 3600,
1366 'comments': [
1367 {
1368 'account': 'test1',
1369 'content': 'blah blah',
1370 },
1371 {
1372 'account': 'test2',
1373 'content': 'blah blah bleh',
1374 }
1375 ]
1376 }
1377 payload = {'rrsets': [rrset]}
1378 r = self.session.patch(
1379 self.url("/api/v1/servers/localhost/zones/" + name),
1380 data=json.dumps(payload),
1381 headers={'content-type': 'application/json'})
1382 self.assert_success(r)
1383 # make sure the comments have been set, and that the NS
1384 # records are still present
1385 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1386 serverset = get_rrset(data, name, 'NS')
1387 print(serverset)
1388 self.assertNotEquals(serverset['records'], [])
1389 self.assertNotEquals(serverset['comments'], [])
1390 # verify that modified_at has been set by pdns
1391 self.assertNotEquals([c for c in serverset['comments']][0]['modified_at'], 0)
1392 # verify that TTL is correct (regression test)
1393 self.assertEquals(serverset['ttl'], 3600)
1394
1395 def test_zone_comment_delete(self):
1396 # Test: Delete ONLY comments.
1397 name, payload, zone = self.create_zone()
1398 rrset = {
1399 'changetype': 'replace',
1400 'name': name,
1401 'type': 'NS',
1402 'comments': []
1403 }
1404 payload = {'rrsets': [rrset]}
1405 r = self.session.patch(
1406 self.url("/api/v1/servers/localhost/zones/" + name),
1407 data=json.dumps(payload),
1408 headers={'content-type': 'application/json'})
1409 self.assert_success(r)
1410 # make sure the NS records are still present
1411 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1412 serverset = get_rrset(data, name, 'NS')
1413 print(serverset)
1414 self.assertNotEquals(serverset['records'], [])
1415 self.assertEquals(serverset['comments'], [])
1416
1417 def test_zone_comment_stay_intact(self):
1418 # Test if comments on an rrset stay intact if the rrset is replaced
1419 name, payload, zone = self.create_zone()
1420 # create a comment
1421 rrset = {
1422 'changetype': 'replace',
1423 'name': name,
1424 'type': 'NS',
1425 'comments': [
1426 {
1427 'account': 'test1',
1428 'content': 'oh hi there',
1429 'modified_at': 1111
1430 }
1431 ]
1432 }
1433 payload = {'rrsets': [rrset]}
1434 r = self.session.patch(
1435 self.url("/api/v1/servers/localhost/zones/" + name),
1436 data=json.dumps(payload),
1437 headers={'content-type': 'application/json'})
1438 self.assert_success(r)
1439 # replace rrset records
1440 rrset2 = {
1441 'changetype': 'replace',
1442 'name': name,
1443 'type': 'NS',
1444 'ttl': 3600,
1445 'records': [
1446 {
1447 "content": "ns1.bar.com.",
1448 "disabled": False
1449 }
1450 ]
1451 }
1452 payload2 = {'rrsets': [rrset2]}
1453 r = self.session.patch(
1454 self.url("/api/v1/servers/localhost/zones/" + name),
1455 data=json.dumps(payload2),
1456 headers={'content-type': 'application/json'})
1457 self.assert_success(r)
1458 # make sure the comments still exist
1459 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1460 serverset = get_rrset(data, name, 'NS')
1461 print(serverset)
1462 self.assertEquals(serverset['records'], rrset2['records'])
1463 self.assertEquals(serverset['comments'], rrset['comments'])
1464
1465 def test_zone_auto_ptr_ipv4_create(self):
1466 revzone = '4.2.192.in-addr.arpa.'
1467 _, _, revzonedata = self.create_zone(name=revzone)
1468 name = unique_zone_name()
1469 rrset = {
1470 "name": name,
1471 "type": "A",
1472 "ttl": 3600,
1473 "records": [{
1474 "content": "192.2.4.44",
1475 "disabled": False,
1476 "set-ptr": True,
1477 }],
1478 }
1479 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
1480 del rrset['records'][0]['set-ptr']
1481 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
1482 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1483 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1484 print(revsets)
1485 self.assertEquals(revsets, [{
1486 u'name': u'44.4.2.192.in-addr.arpa.',
1487 u'ttl': 3600,
1488 u'type': u'PTR',
1489 u'comments': [],
1490 u'records': [{
1491 u'content': name,
1492 u'disabled': False,
1493 }],
1494 }])
1495 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1496 self.assertGreater(r['serial'], revzonedata['serial'])
1497
1498 def test_zone_auto_ptr_ipv4_update(self):
1499 revzone = '0.2.192.in-addr.arpa.'
1500 _, _, revzonedata = self.create_zone(name=revzone)
1501 name, payload, zone = self.create_zone()
1502 rrset = {
1503 'changetype': 'replace',
1504 'name': name,
1505 'type': 'A',
1506 'ttl': 3600,
1507 'records': [
1508 {
1509 "content": '192.2.0.2',
1510 "disabled": False,
1511 "set-ptr": True
1512 }
1513 ]
1514 }
1515 payload = {'rrsets': [rrset]}
1516 r = self.session.patch(
1517 self.url("/api/v1/servers/localhost/zones/" + name),
1518 data=json.dumps(payload),
1519 headers={'content-type': 'application/json'})
1520 self.assert_success(r)
1521 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1522 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1523 print(revsets)
1524 self.assertEquals(revsets, [{
1525 u'name': u'2.0.2.192.in-addr.arpa.',
1526 u'ttl': 3600,
1527 u'type': u'PTR',
1528 u'comments': [],
1529 u'records': [{
1530 u'content': name,
1531 u'disabled': False,
1532 }],
1533 }])
1534 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1535 self.assertGreater(r['serial'], revzonedata['serial'])
1536
1537 def test_zone_auto_ptr_ipv6_update(self):
1538 # 2001:DB8::bb:aa
1539 revzone = '8.b.d.0.1.0.0.2.ip6.arpa.'
1540 _, _, revzonedata = self.create_zone(name=revzone)
1541 name, payload, zone = self.create_zone()
1542 rrset = {
1543 'changetype': 'replace',
1544 'name': name,
1545 'type': 'AAAA',
1546 'ttl': 3600,
1547 'records': [
1548 {
1549 "content": '2001:DB8::bb:aa',
1550 "disabled": False,
1551 "set-ptr": True
1552 }
1553 ]
1554 }
1555 payload = {'rrsets': [rrset]}
1556 r = self.session.patch(
1557 self.url("/api/v1/servers/localhost/zones/" + name),
1558 data=json.dumps(payload),
1559 headers={'content-type': 'application/json'})
1560 self.assert_success(r)
1561 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1562 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1563 print(revsets)
1564 self.assertEquals(revsets, [{
1565 u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1566 u'ttl': 3600,
1567 u'type': u'PTR',
1568 u'comments': [],
1569 u'records': [{
1570 u'content': name,
1571 u'disabled': False,
1572 }],
1573 }])
1574 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1575 self.assertGreater(r['serial'], revzonedata['serial'])
1576
1577 def test_search_rr_exact_zone(self):
1578 name = unique_zone_name()
1579 self.create_zone(name=name, serial=22, soa_edit_api='')
1580 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
1581 self.assert_success_json(r)
1582 print(r.json())
1583 self.assertEquals(r.json(), [
1584 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1585 {u'content': u'ns1.example.com.',
1586 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1587 u'ttl': 3600, u'type': u'NS', u'name': name},
1588 {u'content': u'ns2.example.com.',
1589 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1590 u'ttl': 3600, u'type': u'NS', u'name': name},
1591 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1592 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1593 u'ttl': 3600, u'type': u'SOA', u'name': name},
1594 ])
1595
1596 def test_search_rr_substring(self):
1597 name = unique_zone_name()
1598 search = name[5:-5]
1599 self.create_zone(name=name)
1600 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1601 self.assert_success_json(r)
1602 print(r.json())
1603 # should return zone, SOA, ns1, ns2
1604 self.assertEquals(len(r.json()), 4)
1605
1606 def test_search_rr_case_insensitive(self):
1607 name = unique_zone_name()+'testsuffix.'
1608 self.create_zone(name=name)
1609 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1610 self.assert_success_json(r)
1611 print(r.json())
1612 # should return zone, SOA, ns1, ns2
1613 self.assertEquals(len(r.json()), 4)
1614
1615 def test_search_after_rectify_with_ent(self):
1616 name = unique_zone_name()
1617 search = name.split('.')[0]
1618 rrset = {
1619 "name": 'sub.sub.' + name,
1620 "type": "A",
1621 "ttl": 3600,
1622 "records": [{
1623 "content": "4.3.2.1",
1624 "disabled": False,
1625 }],
1626 }
1627 self.create_zone(name=name, rrsets=[rrset])
1628 pdnsutil_rectify(name)
1629 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1630 self.assert_success_json(r)
1631 print(r.json())
1632 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1633 self.assertEquals(len(r.json()), 5)
1634
1635 def test_default_api_rectify(self):
1636 name = unique_zone_name()
1637 search = name.split('.')[0]
1638 rrsets = [
1639 {
1640 "name": 'a.' + name,
1641 "type": "AAAA",
1642 "ttl": 3600,
1643 "records": [{
1644 "content": "2001:DB8::1",
1645 "disabled": False,
1646 }],
1647 },
1648 {
1649 "name": 'b.' + name,
1650 "type": "AAAA",
1651 "ttl": 3600,
1652 "records": [{
1653 "content": "2001:DB8::2",
1654 "disabled": False,
1655 }],
1656 },
1657 ]
1658 self.create_zone(name=name, rrsets=rrsets, dnssec=True, nsec3param='1 0 1 ab')
1659 dbrecs = get_db_records(name, 'AAAA')
1660 self.assertIsNotNone(dbrecs[0]['ordername'])
1661
1662 def test_override_api_rectify(self):
1663 name = unique_zone_name()
1664 search = name.split('.')[0]
1665 rrsets = [
1666 {
1667 "name": 'a.' + name,
1668 "type": "AAAA",
1669 "ttl": 3600,
1670 "records": [{
1671 "content": "2001:DB8::1",
1672 "disabled": False,
1673 }],
1674 },
1675 {
1676 "name": 'b.' + name,
1677 "type": "AAAA",
1678 "ttl": 3600,
1679 "records": [{
1680 "content": "2001:DB8::2",
1681 "disabled": False,
1682 }],
1683 },
1684 ]
1685 self.create_zone(name=name, rrsets=rrsets, api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
1686 dbrecs = get_db_records(name, 'AAAA')
1687 self.assertIsNone(dbrecs[0]['ordername'])
1688
1689 def test_cname_at_ent_place(self):
1690 name, payload, zone = self.create_zone(api_rectify=True)
1691 rrset = {
1692 'changetype': 'replace',
1693 'name': 'sub2.sub1.' + name,
1694 'type': "A",
1695 'ttl': 3600,
1696 'records': [{
1697 'content': "4.3.2.1",
1698 'disabled': False,
1699 }],
1700 }
1701 payload = {'rrsets': [rrset]}
1702 r = self.session.patch(
1703 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1704 data=json.dumps(payload),
1705 headers={'content-type': 'application/json'})
1706 self.assertEquals(r.status_code, 204)
1707 rrset = {
1708 'changetype': 'replace',
1709 'name': 'sub1.' + name,
1710 'type': "CNAME",
1711 'ttl': 3600,
1712 'records': [{
1713 'content': "www.example.org.",
1714 'disabled': False,
1715 }],
1716 }
1717 payload = {'rrsets': [rrset]}
1718 r = self.session.patch(
1719 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1720 data=json.dumps(payload),
1721 headers={'content-type': 'application/json'})
1722 self.assertEquals(r.status_code, 204)
1723
1724 def test_rrset_parameter_post_false(self):
1725 name = unique_zone_name()
1726 payload = {
1727 'name': name,
1728 'kind': 'Native',
1729 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1730 }
1731 r = self.session.post(
1732 self.url("/api/v1/servers/localhost/zones?rrsets=false"),
1733 data=json.dumps(payload),
1734 headers={'content-type': 'application/json'})
1735 print(r.json())
1736 self.assert_success_json(r)
1737 self.assertEquals(r.status_code, 201)
1738 self.assertEquals(r.json().get('rrsets'), None)
1739
1740 def test_rrset_false_parameter(self):
1741 name = unique_zone_name()
1742 self.create_zone(name=name, kind='Native')
1743 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
1744 self.assert_success_json(r)
1745 print(r.json())
1746 self.assertEquals(r.json().get('rrsets'), None)
1747
1748 def test_rrset_true_parameter(self):
1749 name = unique_zone_name()
1750 self.create_zone(name=name, kind='Native')
1751 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
1752 self.assert_success_json(r)
1753 print(r.json())
1754 self.assertEquals(len(r.json().get('rrsets')), 2)
1755
1756 def test_wrong_rrset_parameter(self):
1757 name = unique_zone_name()
1758 self.create_zone(name=name, kind='Native')
1759 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar"))
1760 self.assertEquals(r.status_code, 422)
1761 self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error'])
1762
1763 def test_put_master_tsig_key_ids_non_existent(self):
1764 name = unique_zone_name()
1765 keyname = unique_zone_name().split('.')[0]
1766 self.create_zone(name=name, kind='Native')
1767 payload = {
1768 'master_tsig_key_ids': [keyname]
1769 }
1770 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
1771 data=json.dumps(payload),
1772 headers={'content-type': 'application/json'})
1773 self.assertEquals(r.status_code, 422)
1774 self.assertIn('A TSIG key with the name', r.json()['error'])
1775
1776 def test_put_slave_tsig_key_ids_non_existent(self):
1777 name = unique_zone_name()
1778 keyname = unique_zone_name().split('.')[0]
1779 self.create_zone(name=name, kind='Native')
1780 payload = {
1781 'slave_tsig_key_ids': [keyname]
1782 }
1783 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
1784 data=json.dumps(payload),
1785 headers={'content-type': 'application/json'})
1786 self.assertEquals(r.status_code, 422)
1787 self.assertIn('A TSIG key with the name', r.json()['error'])
1788
1789
1790 @unittest.skipIf(not is_auth(), "Not applicable")
1791 class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
1792
1793 def setUp(self):
1794 super(AuthRootZone, self).setUp()
1795 # zone name is not unique, so delete the zone before each individual test.
1796 self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
1797
1798 def test_create_zone(self):
1799 name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
1800 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
1801 self.assertIn(k, data)
1802 if k in payload:
1803 self.assertEquals(data[k], payload[k])
1804 # validate generated SOA
1805 rec = get_first_rec(data, '.', 'SOA')
1806 self.assertEquals(
1807 rec['content'],
1808 "a.misconfigured.powerdns.server. hostmaster. " + str(payload['serial']) +
1809 " 10800 3600 604800 3600"
1810 )
1811 # Regression test: verify zone list works
1812 zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
1813 print("zonelist:", zonelist)
1814 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
1815 # Also test that fetching the zone works.
1816 print("id:", data['id'])
1817 self.assertEquals(data['id'], '=2E')
1818 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
1819 print("zone (fetched):", data)
1820 for k in ('name', 'kind'):
1821 self.assertIn(k, data)
1822 self.assertEquals(data[k], payload[k])
1823 self.assertEqual(data['rrsets'][0]['name'], '.')
1824
1825 def test_update_zone(self):
1826 name, payload, zone = self.create_zone(name='.')
1827 zone_id = '=2E'
1828 # update, set as Master and enable SOA-EDIT-API
1829 payload = {
1830 'kind': 'Master',
1831 'masters': ['192.0.2.1', '192.0.2.2'],
1832 'soa_edit_api': 'EPOCH',
1833 'soa_edit': 'EPOCH'
1834 }
1835 r = self.session.put(
1836 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1837 data=json.dumps(payload),
1838 headers={'content-type': 'application/json'})
1839 self.assert_success(r)
1840 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1841 for k in payload.keys():
1842 self.assertIn(k, data)
1843 self.assertEquals(data[k], payload[k])
1844 # update, back to Native and empty(off)
1845 payload = {
1846 'kind': 'Native',
1847 'soa_edit_api': '',
1848 'soa_edit': ''
1849 }
1850 r = self.session.put(
1851 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1852 data=json.dumps(payload),
1853 headers={'content-type': 'application/json'})
1854 self.assert_success(r)
1855 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1856 for k in payload.keys():
1857 self.assertIn(k, data)
1858 self.assertEquals(data[k], payload[k])
1859
1860
1861 @unittest.skipIf(not is_recursor(), "Not applicable")
1862 class RecursorZones(ApiTestCase):
1863
1864 def create_zone(self, name=None, kind=None, rd=False, servers=None):
1865 if name is None:
1866 name = unique_zone_name()
1867 if servers is None:
1868 servers = []
1869 payload = {
1870 'name': name,
1871 'kind': kind,
1872 'servers': servers,
1873 'recursion_desired': rd
1874 }
1875 r = self.session.post(
1876 self.url("/api/v1/servers/localhost/zones"),
1877 data=json.dumps(payload),
1878 headers={'content-type': 'application/json'})
1879 self.assert_success_json(r)
1880 return payload, r.json()
1881
1882 def test_create_auth_zone(self):
1883 payload, data = self.create_zone(kind='Native')
1884 for k in payload.keys():
1885 self.assertEquals(data[k], payload[k])
1886
1887 def test_create_zone_no_name(self):
1888 payload = {
1889 'name': '',
1890 'kind': 'Native',
1891 'servers': ['8.8.8.8'],
1892 'recursion_desired': False,
1893 }
1894 print(payload)
1895 r = self.session.post(
1896 self.url("/api/v1/servers/localhost/zones"),
1897 data=json.dumps(payload),
1898 headers={'content-type': 'application/json'})
1899 self.assertEquals(r.status_code, 422)
1900 self.assertIn('is not canonical', r.json()['error'])
1901
1902 def test_create_forwarded_zone(self):
1903 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
1904 # return values are normalized
1905 payload['servers'][0] += ':53'
1906 for k in payload.keys():
1907 self.assertEquals(data[k], payload[k])
1908
1909 def test_create_forwarded_rd_zone(self):
1910 payload, data = self.create_zone(name='google.com.', kind='Forwarded', rd=True, servers=['8.8.8.8'])
1911 # return values are normalized
1912 payload['servers'][0] += ':53'
1913 for k in payload.keys():
1914 self.assertEquals(data[k], payload[k])
1915
1916 def test_create_auth_zone_with_symbols(self):
1917 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
1918 expected_id = (payload['name'].replace('/', '=2F'))
1919 for k in payload.keys():
1920 self.assertEquals(data[k], payload[k])
1921 self.assertEquals(data['id'], expected_id)
1922
1923 def test_rename_auth_zone(self):
1924 payload, data = self.create_zone(kind='Native')
1925 name = payload['name']
1926 # now rename it
1927 payload = {
1928 'name': 'renamed-'+name,
1929 'kind': 'Native',
1930 'recursion_desired': False
1931 }
1932 r = self.session.put(
1933 self.url("/api/v1/servers/localhost/zones/" + name),
1934 data=json.dumps(payload),
1935 headers={'content-type': 'application/json'})
1936 self.assert_success(r)
1937 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + payload['name'])).json()
1938 for k in payload.keys():
1939 self.assertEquals(data[k], payload[k])
1940
1941 def test_zone_delete(self):
1942 payload, zone = self.create_zone(kind='Native')
1943 name = payload['name']
1944 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1945 self.assertEquals(r.status_code, 204)
1946 self.assertNotIn('Content-Type', r.headers)
1947
1948 def test_search_rr_exact_zone(self):
1949 name = unique_zone_name()
1950 self.create_zone(name=name, kind='Native')
1951 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
1952 self.assert_success_json(r)
1953 print(r.json())
1954 self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
1955
1956 def test_search_rr_substring(self):
1957 name = 'search-rr-zone.name.'
1958 self.create_zone(name=name, kind='Native')
1959 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
1960 self.assert_success_json(r)
1961 print(r.json())
1962 # should return zone, SOA
1963 self.assertEquals(len(r.json()), 2)
1964
1965 @unittest.skipIf(not is_auth(), "Not applicable")
1966 class AuthZoneKeys(ApiTestCase, AuthZonesHelperMixin):
1967
1968 def test_get_keys(self):
1969 r = self.session.get(
1970 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
1971 self.assert_success_json(r)
1972 keys = r.json()
1973 self.assertGreater(len(keys), 0)
1974
1975 key0 = deepcopy(keys[0])
1976 del key0['dnskey']
1977 del key0['ds']
1978 expected = {
1979 u'algorithm': u'ECDSAP256SHA256',
1980 u'bits': 256,
1981 u'active': True,
1982 u'type': u'Cryptokey',
1983 u'keytype': u'csk',
1984 u'flags': 257,
1985 u'id': 1}
1986 self.assertEquals(key0, expected)
1987
1988 keydata = keys[0]['dnskey'].split()
1989 self.assertEqual(len(keydata), 4)