]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
Merge pull request #7872 from Habbie/auth-lua-reuse-docs
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
1 from __future__ import print_function
2 import json
3 import time
4 import unittest
5 from copy import deepcopy
6 from parameterized import parameterized
7 from pprint import pprint
8 from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, get_db_records, pdnsutil_rectify
9
10
11 def get_rrset(data, qname, qtype):
12 for rrset in data['rrsets']:
13 if rrset['name'] == qname and rrset['type'] == qtype:
14 return rrset
15 return None
16
17
18 def get_first_rec(data, qname, qtype):
19 rrset = get_rrset(data, qname, qtype)
20 if rrset:
21 return rrset['records'][0]
22 return None
23
24
25 def eq_zone_rrsets(rrsets, expected):
26 data_got = {}
27 data_expected = {}
28 for type_, expected_records in expected.items():
29 type_ = str(type_)
30 data_got[type_] = set()
31 data_expected[type_] = set()
32 uses_name = any(['name' in expected_record for expected_record in expected_records])
33 # minify + convert received data
34 for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
35 print(rrset)
36 for r in rrset['records']:
37 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
38 # minify expected data
39 for r in expected_records:
40 data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
41
42 print("eq_zone_rrsets: got:")
43 pprint(data_got)
44 print("eq_zone_rrsets: expected:")
45 pprint(data_expected)
46
47 assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
48
49
50 class Zones(ApiTestCase):
51
52 def test_list_zones(self):
53 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
54 self.assert_success_json(r)
55 domains = r.json()
56 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
57 self.assertEquals(len(example_com), 1)
58 example_com = example_com[0]
59 print(example_com)
60 required_fields = ['id', 'url', 'name', 'kind']
61 if is_auth():
62 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'edited_serial', 'serial', 'account']
63 self.assertNotEquals(example_com['serial'], 0)
64 elif is_recursor():
65 required_fields = required_fields + ['recursion_desired', 'servers']
66 for field in required_fields:
67 self.assertIn(field, example_com)
68
69
70 class AuthZonesHelperMixin(object):
71 def create_zone(self, name=None, **kwargs):
72 if name is None:
73 name = unique_zone_name()
74 payload = {
75 'name': name,
76 'kind': 'Native',
77 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
78 }
79 for k, v in kwargs.items():
80 if v is None:
81 del payload[k]
82 else:
83 payload[k] = v
84 print("sending", payload)
85 r = self.session.post(
86 self.url("/api/v1/servers/localhost/zones"),
87 data=json.dumps(payload),
88 headers={'content-type': 'application/json'})
89 self.assert_success_json(r)
90 self.assertEquals(r.status_code, 201)
91 reply = r.json()
92 print("reply", reply)
93 return name, payload, reply
94
95
96 @unittest.skipIf(not is_auth(), "Not applicable")
97 class AuthZones(ApiTestCase, AuthZonesHelperMixin):
98
99 def test_create_zone(self):
100 # soa_edit_api has a default, override with empty for this test
101 name, payload, data = self.create_zone(serial=22, soa_edit_api='')
102 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'edited_serial', 'soa_edit_api', 'soa_edit', 'account'):
103 self.assertIn(k, data)
104 if k in payload:
105 self.assertEquals(data[k], payload[k])
106 # validate generated SOA
107 expected_soa = "a.misconfigured.powerdns.server. hostmaster." + name + " " + \
108 str(payload['serial']) + " 10800 3600 604800 3600"
109 self.assertEquals(
110 get_first_rec(data, name, 'SOA')['content'],
111 expected_soa
112 )
113 # Because we had confusion about dots, check that the DB is without dots.
114 dbrecs = get_db_records(name, 'SOA')
115 self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
116 self.assertNotEquals(data['serial'], data['edited_serial'])
117
118 def test_create_zone_with_soa_edit_api(self):
119 # soa_edit_api wins over serial
120 name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
121 for k in ('soa_edit_api', ):
122 self.assertIn(k, data)
123 if k in payload:
124 self.assertEquals(data[k], payload[k])
125 # generated EPOCH serial surely is > fixed serial we passed in
126 print(data)
127 self.assertGreater(data['serial'], payload['serial'])
128 soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
129 self.assertGreater(soa_serial, payload['serial'])
130 self.assertEquals(soa_serial, data['serial'])
131
132 def test_create_zone_with_account(self):
133 # soa_edit_api wins over serial
134 name, payload, data = self.create_zone(account='anaccount', serial=10)
135 print(data)
136 for k in ('account', ):
137 self.assertIn(k, data)
138 if k in payload:
139 self.assertEquals(data[k], payload[k])
140
141 def test_create_zone_default_soa_edit_api(self):
142 name, payload, data = self.create_zone()
143 print(data)
144 self.assertEquals(data['soa_edit_api'], 'DEFAULT')
145
146 def test_create_zone_exists(self):
147 name, payload, data = self.create_zone()
148 print(data)
149 payload = {
150 'name': name,
151 'kind': 'Native'
152 }
153 print(payload)
154 r = self.session.post(
155 self.url("/api/v1/servers/localhost/zones"),
156 data=json.dumps(payload),
157 headers={'content-type': 'application/json'})
158 self.assertEquals(r.status_code, 409) # Conflict - already exists
159
160 def test_create_zone_with_soa_edit(self):
161 name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
162 print(data)
163 self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT')
164 self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
165 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
166 # These particular settings lead to the first serial set to YYYYMMDD01.
167 self.assertEquals(soa_serial[-2:], '01')
168 rrset = {
169 'changetype': 'replace',
170 'name': name,
171 'type': 'A',
172 'ttl': 3600,
173 'records': [
174 {
175 "content": "127.0.0.1",
176 "disabled": False
177 }
178 ]
179 }
180 payload = {'rrsets': [rrset]}
181 self.session.patch(
182 self.url("/api/v1/servers/localhost/zones/" + data['id']),
183 data=json.dumps(payload),
184 headers={'content-type': 'application/json'})
185 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
186 data = r.json()
187 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
188 self.assertEquals(soa_serial[-2:], '02')
189
190 def test_create_zone_with_records(self):
191 name = unique_zone_name()
192 rrset = {
193 "name": name,
194 "type": "A",
195 "ttl": 3600,
196 "records": [{
197 "content": "4.3.2.1",
198 "disabled": False,
199 }],
200 }
201 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
202 # check our record has appeared
203 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
204
205 def test_create_zone_with_wildcard_records(self):
206 name = unique_zone_name()
207 rrset = {
208 "name": "*."+name,
209 "type": "A",
210 "ttl": 3600,
211 "records": [{
212 "content": "4.3.2.1",
213 "disabled": False,
214 }],
215 }
216 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
217 # check our record has appeared
218 self.assertEquals(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
219
220 def test_create_zone_with_comments(self):
221 name = unique_zone_name()
222 rrsets = [
223 {
224 "name": name,
225 "type": "soa", # test uppercasing of type, too.
226 "comments": [{
227 "account": "test1",
228 "content": "blah blah",
229 "modified_at": 11112,
230 }],
231 },
232 {
233 "name": name,
234 "type": "AAAA",
235 "ttl": 3600,
236 "records": [{
237 "content": "2001:DB8::1",
238 "disabled": False,
239 }],
240 "comments": [{
241 "account": "test AAAA",
242 "content": "blah blah AAAA",
243 "modified_at": 11112,
244 }],
245 },
246 {
247 "name": name,
248 "type": "TXT",
249 "ttl": 3600,
250 "records": [{
251 "content": "\"test TXT\"",
252 "disabled": False,
253 }],
254 },
255 {
256 "name": name,
257 "type": "A",
258 "ttl": 3600,
259 "records": [{
260 "content": "192.0.2.1",
261 "disabled": False,
262 }],
263 },
264 ]
265 name, payload, data = self.create_zone(name=name, rrsets=rrsets)
266 # NS records have been created
267 self.assertEquals(len(data['rrsets']), len(rrsets) + 1)
268 # check our comment has appeared
269 self.assertEquals(get_rrset(data, name, 'SOA')['comments'], rrsets[0]['comments'])
270 self.assertEquals(get_rrset(data, name, 'A')['comments'], [])
271 self.assertEquals(get_rrset(data, name, 'TXT')['comments'], [])
272 self.assertEquals(get_rrset(data, name, 'AAAA')['comments'], rrsets[1]['comments'])
273
274 def test_create_zone_uncanonical_nameservers(self):
275 name = unique_zone_name()
276 payload = {
277 'name': name,
278 'kind': 'Native',
279 'nameservers': ['uncanon.example.com']
280 }
281 print(payload)
282 r = self.session.post(
283 self.url("/api/v1/servers/localhost/zones"),
284 data=json.dumps(payload),
285 headers={'content-type': 'application/json'})
286 self.assertEquals(r.status_code, 422)
287 self.assertIn('Nameserver is not canonical', r.json()['error'])
288
289 def test_create_auth_zone_no_name(self):
290 name = unique_zone_name()
291 payload = {
292 'name': '',
293 'kind': 'Native',
294 }
295 print(payload)
296 r = self.session.post(
297 self.url("/api/v1/servers/localhost/zones"),
298 data=json.dumps(payload),
299 headers={'content-type': 'application/json'})
300 self.assertEquals(r.status_code, 422)
301 self.assertIn('is not canonical', r.json()['error'])
302
303 def test_create_zone_with_custom_soa(self):
304 name = unique_zone_name()
305 content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
306 rrset = {
307 "name": name,
308 "type": "soa", # test uppercasing of type, too.
309 "ttl": 3600,
310 "records": [{
311 "content": content,
312 "disabled": False,
313 }],
314 }
315 name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
316 self.assertEquals(get_rrset(data, name, 'SOA')['records'], rrset['records'])
317 dbrecs = get_db_records(name, 'SOA')
318 self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
319
320 def test_create_zone_double_dot(self):
321 name = 'test..' + unique_zone_name()
322 payload = {
323 'name': name,
324 'kind': 'Native',
325 'nameservers': ['ns1.example.com.']
326 }
327 print(payload)
328 r = self.session.post(
329 self.url("/api/v1/servers/localhost/zones"),
330 data=json.dumps(payload),
331 headers={'content-type': 'application/json'})
332 self.assertEquals(r.status_code, 422)
333 self.assertIn('Unable to parse DNS Name', r.json()['error'])
334
335 def test_create_zone_restricted_chars(self):
336 name = 'test:' + unique_zone_name() # : isn't good as a name.
337 payload = {
338 'name': name,
339 'kind': 'Native',
340 'nameservers': ['ns1.example.com']
341 }
342 print(payload)
343 r = self.session.post(
344 self.url("/api/v1/servers/localhost/zones"),
345 data=json.dumps(payload),
346 headers={'content-type': 'application/json'})
347 self.assertEquals(r.status_code, 422)
348 self.assertIn('contains unsupported characters', r.json()['error'])
349
350 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self):
351 name = unique_zone_name()
352 rrset = {
353 "name": name,
354 "type": "NS",
355 "ttl": 3600,
356 "records": [{
357 "content": "ns2.example.com.",
358 "disabled": False,
359 }],
360 }
361 payload = {
362 'name': name,
363 'kind': 'Native',
364 'nameservers': ['ns1.example.com.'],
365 'rrsets': [rrset],
366 }
367 print(payload)
368 r = self.session.post(
369 self.url("/api/v1/servers/localhost/zones"),
370 data=json.dumps(payload),
371 headers={'content-type': 'application/json'})
372 self.assertEquals(r.status_code, 422)
373 self.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r.json()['error'])
374
375 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self):
376 name = unique_zone_name()
377 rrset = {
378 "name": 'subzone.'+name,
379 "type": "NS",
380 "ttl": 3600,
381 "records": [{
382 "content": "ns2.example.com.",
383 "disabled": False,
384 }],
385 }
386 payload = {
387 'name': name,
388 'kind': 'Native',
389 'nameservers': ['ns1.example.com.'],
390 'rrsets': [rrset],
391 }
392 print(payload)
393 r = self.session.post(
394 self.url("/api/v1/servers/localhost/zones"),
395 data=json.dumps(payload),
396 headers={'content-type': 'application/json'})
397 self.assert_success_json(r)
398
399 def test_create_zone_with_symbols(self):
400 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
401 name = payload['name']
402 expected_id = name.replace('/', '=2F')
403 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
404 self.assertIn(k, data)
405 if k in payload:
406 self.assertEquals(data[k], payload[k])
407 self.assertEquals(data['id'], expected_id)
408 dbrecs = get_db_records(name, 'SOA')
409 self.assertEqual(dbrecs[0]['name'], name.rstrip('.'))
410
411 def test_create_zone_with_nameservers_non_string(self):
412 # ensure we don't crash
413 name = unique_zone_name()
414 payload = {
415 'name': name,
416 'kind': 'Native',
417 'nameservers': [{'a': 'ns1.example.com'}] # invalid
418 }
419 print(payload)
420 r = self.session.post(
421 self.url("/api/v1/servers/localhost/zones"),
422 data=json.dumps(payload),
423 headers={'content-type': 'application/json'})
424 self.assertEquals(r.status_code, 422)
425
426 def test_create_zone_with_dnssec(self):
427 """
428 Create a zone with "dnssec" set and see if a key was made.
429 """
430 name = unique_zone_name()
431 name, payload, data = self.create_zone(dnssec=True)
432
433 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
434
435 for k in ('dnssec', ):
436 self.assertIn(k, data)
437 if k in payload:
438 self.assertEquals(data[k], payload[k])
439
440 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
441
442 keys = r.json()
443
444 print(keys)
445
446 self.assertEquals(r.status_code, 200)
447 self.assertEquals(len(keys), 1)
448 self.assertEquals(keys[0]['type'], 'Cryptokey')
449 self.assertEquals(keys[0]['active'], True)
450 self.assertEquals(keys[0]['keytype'], 'csk')
451
452 def test_create_zone_with_dnssec_disable_dnssec(self):
453 """
454 Create a zone with "dnssec", then set "dnssec" to false and see if the
455 keys are gone
456 """
457 name = unique_zone_name()
458 name, payload, data = self.create_zone(dnssec=True)
459
460 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
461 data=json.dumps({'dnssec': False}))
462 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
463
464 zoneinfo = r.json()
465
466 self.assertEquals(r.status_code, 200)
467 self.assertEquals(zoneinfo['dnssec'], False)
468
469 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
470
471 keys = r.json()
472
473 self.assertEquals(r.status_code, 200)
474 self.assertEquals(len(keys), 0)
475
476 def test_create_zone_with_nsec3param(self):
477 """
478 Create a zone with "nsec3param" set and see if the metadata was added.
479 """
480 name = unique_zone_name()
481 nsec3param = '1 0 500 aabbccddeeff'
482 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
483
484 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
485
486 for k in ('dnssec', 'nsec3param'):
487 self.assertIn(k, data)
488 if k in payload:
489 self.assertEquals(data[k], payload[k])
490
491 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3PARAM'))
492
493 data = r.json()
494
495 print(data)
496
497 self.assertEquals(r.status_code, 200)
498 self.assertEquals(len(data['metadata']), 1)
499 self.assertEquals(data['kind'], 'NSEC3PARAM')
500 self.assertEquals(data['metadata'][0], nsec3param)
501
502 def test_create_zone_with_nsec3narrow(self):
503 """
504 Create a zone with "nsec3narrow" set and see if the metadata was added.
505 """
506 name = unique_zone_name()
507 nsec3param = '1 0 500 aabbccddeeff'
508 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
509 nsec3narrow=True)
510
511 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
512
513 for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
514 self.assertIn(k, data)
515 if k in payload:
516 self.assertEquals(data[k], payload[k])
517
518 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3NARROW'))
519
520 data = r.json()
521
522 print(data)
523
524 self.assertEquals(r.status_code, 200)
525 self.assertEquals(len(data['metadata']), 1)
526 self.assertEquals(data['kind'], 'NSEC3NARROW')
527 self.assertEquals(data['metadata'][0], '1')
528
529 def test_create_zone_dnssec_serial(self):
530 """
531 Create a zone set/unset "dnssec" and see if the serial was increased
532 after every step
533 """
534 name = unique_zone_name()
535 name, payload, data = self.create_zone()
536
537 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
538 self.assertEquals(soa_serial[-2:], '01')
539
540 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
541 data=json.dumps({'dnssec': True}))
542 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
543
544 data = r.json()
545 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
546
547 self.assertEquals(r.status_code, 200)
548 self.assertEquals(soa_serial[-2:], '02')
549
550 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
551 data=json.dumps({'dnssec': False}))
552 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
553
554 data = r.json()
555 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
556
557 self.assertEquals(r.status_code, 200)
558 self.assertEquals(soa_serial[-2:], '03')
559
560 def test_zone_absolute_url(self):
561 name, payload, data = self.create_zone()
562 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
563 rdata = r.json()
564 print(rdata[0])
565 self.assertTrue(rdata[0]['url'].startswith('/api/v'))
566
567 def test_create_zone_metadata(self):
568 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
569 r = self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
570 data=json.dumps(payload_metadata))
571 rdata = r.json()
572 self.assertEquals(r.status_code, 201)
573 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
574
575 def test_create_zone_metadata_kind(self):
576 payload_metadata = {"metadata": ["127.0.0.2"]}
577 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
578 data=json.dumps(payload_metadata))
579 rdata = r.json()
580 self.assertEquals(r.status_code, 200)
581 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
582
583 def test_create_protected_zone_metadata(self):
584 # test whether it prevents modification of certain kinds
585 for k in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
586 payload = {"metadata": ["FOO", "BAR"]}
587 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k),
588 data=json.dumps(payload))
589 self.assertEquals(r.status_code, 422)
590
591 def test_retrieve_zone_metadata(self):
592 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
593 self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
594 data=json.dumps(payload_metadata))
595 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata"))
596 rdata = r.json()
597 self.assertEquals(r.status_code, 200)
598 self.assertIn(payload_metadata, rdata)
599
600 def test_delete_zone_metadata(self):
601 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
602 self.assertEquals(r.status_code, 200)
603 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
604 rdata = r.json()
605 self.assertEquals(r.status_code, 200)
606 self.assertEquals(rdata["metadata"], [])
607
608 def test_create_external_zone_metadata(self):
609 payload_metadata = {"metadata": ["My very important message"]}
610 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
611 data=json.dumps(payload_metadata))
612 self.assertEquals(r.status_code, 200)
613 rdata = r.json()
614 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
615
616 def test_create_metadata_in_non_existent_zone(self):
617 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
618 r = self.session.post(self.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
619 data=json.dumps(payload_metadata))
620 self.assertEquals(r.status_code, 404)
621 # Note: errors should probably contain json (see #5988)
622 # self.assertIn('Could not find domain ', r.json()['error'])
623
624 def test_create_slave_zone(self):
625 # Test that nameservers can be absent for slave zones.
626 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
627 for k in ('name', 'masters', 'kind'):
628 self.assertIn(k, data)
629 self.assertEquals(data[k], payload[k])
630 print("payload:", payload)
631 print("data:", data)
632 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
633 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
634 zonelist = r.json()
635 print("zonelist:", zonelist)
636 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
637 # Also test that fetching the zone works.
638 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
639 data = r.json()
640 print("zone (fetched):", data)
641 for k in ('name', 'masters', 'kind'):
642 self.assertIn(k, data)
643 self.assertEquals(data[k], payload[k])
644 self.assertEqual(data['serial'], 0)
645 self.assertEqual(data['rrsets'], [])
646
647 def test_find_zone_by_name(self):
648 name = 'foo/' + unique_zone_name()
649 name, payload, data = self.create_zone(name=name)
650 r = self.session.get(self.url("/api/v1/servers/localhost/zones?zone=" + name))
651 data = r.json()
652 print(data)
653 self.assertEquals(data[0]['name'], name)
654
655 def test_delete_slave_zone(self):
656 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
657 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
658 r.raise_for_status()
659
660 def test_retrieve_slave_zone(self):
661 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
662 print("payload:", payload)
663 print("data:", data)
664 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
665 data = r.json()
666 print("status for axfr-retrieve:", data)
667 self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
668 '\' from master 127.0.0.2')
669
670 def test_notify_master_zone(self):
671 name, payload, data = self.create_zone(kind='Master')
672 print("payload:", payload)
673 print("data:", data)
674 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
675 data = r.json()
676 print("status for notify:", data)
677 self.assertEqual(data['result'], 'Notification queued')
678
679 def test_get_zone_with_symbols(self):
680 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
681 name = payload['name']
682 zone_id = (name.replace('/', '=2F'))
683 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
684 data = r.json()
685 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
686 self.assertIn(k, data)
687 if k in payload:
688 self.assertEquals(data[k], payload[k])
689
690 def test_get_zone(self):
691 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
692 domains = r.json()
693 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
694 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id']))
695 self.assert_success_json(r)
696 data = r.json()
697 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
698 self.assertIn(k, data)
699 self.assertEquals(data['name'], 'example.com.')
700
701 def test_import_zone_broken(self):
702 payload = {
703 'name': 'powerdns-broken.com',
704 'kind': 'Master',
705 'nameservers': [],
706 }
707 payload['zone'] = """
708 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
709 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
710 ;; WARNING: recursion requested but not available
711
712 ;; OPT PSEUDOSECTION:
713 ; EDNS: version: 0, flags:; udp: 1680
714 ;; QUESTION SECTION:
715 ;powerdns.com. IN SOA
716
717 ;; ANSWER SECTION:
718 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
719 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
720 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
721 powerdns-broken.com. 86400 IN A 82.94.213.34
722 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
723 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
724 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
725 """
726 r = self.session.post(
727 self.url("/api/v1/servers/localhost/zones"),
728 data=json.dumps(payload),
729 headers={'content-type': 'application/json'})
730 self.assertEquals(r.status_code, 422)
731
732 def test_import_zone_axfr_outofzone(self):
733 # Ensure we don't create out-of-zone records
734 payload = {
735 'name': unique_zone_name(),
736 'kind': 'Master',
737 'nameservers': [],
738 }
739 payload['zone'] = """
740 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
741 %NAME% 3600 IN NS powerdnssec2.ds9a.nl.
742 example.org. 3600 IN AAAA 2001:888:2000:1d::2
743 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
744 """.replace('%NAME%', payload['name'])
745 r = self.session.post(
746 self.url("/api/v1/servers/localhost/zones"),
747 data=json.dumps(payload),
748 headers={'content-type': 'application/json'})
749 self.assertEquals(r.status_code, 422)
750 self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
751
752 def test_import_zone_axfr(self):
753 payload = {
754 'name': 'powerdns.com.',
755 'kind': 'Master',
756 'nameservers': [],
757 'soa_edit_api': '', # turn off so exact SOA comparison works.
758 }
759 payload['zone'] = """
760 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
761 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
762 ;; WARNING: recursion requested but not available
763
764 ;; OPT PSEUDOSECTION:
765 ; EDNS: version: 0, flags:; udp: 1680
766 ;; QUESTION SECTION:
767 ;powerdns.com. IN SOA
768
769 ;; ANSWER SECTION:
770 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
771 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
772 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
773 powerdns.com. 86400 IN A 82.94.213.34
774 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
775 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
776 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
777 """
778 r = self.session.post(
779 self.url("/api/v1/servers/localhost/zones"),
780 data=json.dumps(payload),
781 headers={'content-type': 'application/json'})
782 self.assert_success_json(r)
783 data = r.json()
784 self.assertIn('name', data)
785
786 expected = {
787 'NS': [
788 {'content': 'powerdnssec1.ds9a.nl.'},
789 {'content': 'powerdnssec2.ds9a.nl.'},
790 ],
791 'SOA': [
792 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
793 ],
794 'MX': [
795 {'content': '0 xs.powerdns.com.'},
796 ],
797 'A': [
798 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
799 ],
800 'AAAA': [
801 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
802 ],
803 }
804
805 eq_zone_rrsets(data['rrsets'], expected)
806
807 # check content in DB is stored WITHOUT trailing dot.
808 dbrecs = get_db_records(payload['name'], 'NS')
809 dbrec = next((dbrec for dbrec in dbrecs if dbrec['content'].startswith('powerdnssec1')))
810 self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
811
812 def test_import_zone_bind(self):
813 payload = {
814 'name': 'example.org.',
815 'kind': 'Master',
816 'nameservers': [],
817 'soa_edit_api': '', # turn off so exact SOA comparison works.
818 }
819 payload['zone'] = """
820 $TTL 86400 ; 24 hours could have been written as 24h or 1d
821 ; $TTL used for all RRs without explicit TTL value
822 $ORIGIN example.org.
823 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
824 2002022401 ; serial
825 3H ; refresh
826 15 ; retry
827 1w ; expire
828 3h ; minimum
829 )
830 IN NS ns1.example.org. ; in the domain
831 IN NS ns2.smokeyjoe.com. ; external to domain
832 IN MX 10 mail.another.com. ; external mail provider
833 ; server host definitions
834 ns1 IN A 192.168.0.1 ;name server definition
835 www IN A 192.168.0.2 ;web server definition
836 ftp IN CNAME www.example.org. ;ftp server definition
837 ; non server domain hosts
838 bill IN A 192.168.0.3
839 fred IN A 192.168.0.4
840 """
841 r = self.session.post(
842 self.url("/api/v1/servers/localhost/zones"),
843 data=json.dumps(payload),
844 headers={'content-type': 'application/json'})
845 self.assert_success_json(r)
846 data = r.json()
847 self.assertIn('name', data)
848
849 expected = {
850 'NS': [
851 {'content': 'ns1.example.org.'},
852 {'content': 'ns2.smokeyjoe.com.'},
853 ],
854 'SOA': [
855 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
856 ],
857 'MX': [
858 {'content': '10 mail.another.com.'},
859 ],
860 'A': [
861 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
862 {'content': '192.168.0.2', 'name': 'www.example.org.'},
863 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
864 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
865 ],
866 'CNAME': [
867 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
868 ],
869 }
870
871 eq_zone_rrsets(data['rrsets'], expected)
872
873 def test_import_zone_bind_cname_apex(self):
874 payload = {
875 'name': unique_zone_name(),
876 'kind': 'Master',
877 'nameservers': [],
878 }
879 payload['zone'] = """
880 $ORIGIN %NAME%
881 @ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
882 @ IN NS ns1.example.org.
883 @ IN NS ns2.smokeyjoe.com.
884 @ IN CNAME www.example.org.
885 """.replace('%NAME%', payload['name'])
886 r = self.session.post(
887 self.url("/api/v1/servers/localhost/zones"),
888 data=json.dumps(payload),
889 headers={'content-type': 'application/json'})
890 self.assertEquals(r.status_code, 422)
891 self.assertIn('Conflicts with another RRset', r.json()['error'])
892
893 def test_export_zone_json(self):
894 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
895 # export it
896 r = self.session.get(
897 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
898 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
899 )
900 self.assert_success_json(r)
901 data = r.json()
902 self.assertIn('zone', data)
903 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
904 name + '\t3600\tIN\tNS\tns2.foo.com.',
905 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
906 ' 0 10800 3600 604800 3600']
907 self.assertEquals(data['zone'].strip().split('\n'), expected_data)
908
909 def test_export_zone_text(self):
910 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
911 # export it
912 r = self.session.get(
913 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
914 headers={'accept': '*/*'}
915 )
916 data = r.text.strip().split("\n")
917 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
918 name + '\t3600\tIN\tNS\tns2.foo.com.',
919 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
920 ' 0 10800 3600 604800 3600']
921 self.assertEquals(data, expected_data)
922
923 def test_update_zone(self):
924 name, payload, zone = self.create_zone()
925 name = payload['name']
926 # update, set as Master and enable SOA-EDIT-API
927 payload = {
928 'kind': 'Master',
929 'masters': ['192.0.2.1', '192.0.2.2'],
930 'soa_edit_api': 'EPOCH',
931 'soa_edit': 'EPOCH'
932 }
933 r = self.session.put(
934 self.url("/api/v1/servers/localhost/zones/" + name),
935 data=json.dumps(payload),
936 headers={'content-type': 'application/json'})
937 self.assert_success(r)
938 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
939 for k in payload.keys():
940 self.assertIn(k, data)
941 self.assertEquals(data[k], payload[k])
942 # update, back to Native and empty(off)
943 payload = {
944 'kind': 'Native',
945 'soa_edit_api': '',
946 'soa_edit': ''
947 }
948 r = self.session.put(
949 self.url("/api/v1/servers/localhost/zones/" + name),
950 data=json.dumps(payload),
951 headers={'content-type': 'application/json'})
952 self.assert_success(r)
953 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
954 for k in payload.keys():
955 self.assertIn(k, data)
956 self.assertEquals(data[k], payload[k])
957
958 def test_zone_rr_update(self):
959 name, payload, zone = self.create_zone()
960 # do a replace (= update)
961 rrset = {
962 'changetype': 'replace',
963 'name': name,
964 'type': 'ns',
965 'ttl': 3600,
966 'records': [
967 {
968 "content": "ns1.bar.com.",
969 "disabled": False
970 },
971 {
972 "content": "ns2-disabled.bar.com.",
973 "disabled": True
974 }
975 ]
976 }
977 payload = {'rrsets': [rrset]}
978 r = self.session.patch(
979 self.url("/api/v1/servers/localhost/zones/" + name),
980 data=json.dumps(payload),
981 headers={'content-type': 'application/json'})
982 self.assert_success(r)
983 # verify that (only) the new record is there
984 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
985 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset['records'])
986
987 def test_zone_rr_update_mx(self):
988 # Important to test with MX records, as they have a priority field, which must end up in the content field.
989 name, payload, zone = self.create_zone()
990 # do a replace (= update)
991 rrset = {
992 'changetype': 'replace',
993 'name': name,
994 'type': 'MX',
995 'ttl': 3600,
996 'records': [
997 {
998 "content": "10 mail.example.org.",
999 "disabled": False
1000 }
1001 ]
1002 }
1003 payload = {'rrsets': [rrset]}
1004 r = self.session.patch(
1005 self.url("/api/v1/servers/localhost/zones/" + name),
1006 data=json.dumps(payload),
1007 headers={'content-type': 'application/json'})
1008 self.assert_success(r)
1009 # verify that (only) the new record is there
1010 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1011 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset['records'])
1012
1013 def test_zone_rr_update_invalid_mx(self):
1014 name, payload, zone = self.create_zone()
1015 # do a replace (= update)
1016 rrset = {
1017 'changetype': 'replace',
1018 'name': name,
1019 'type': 'MX',
1020 'ttl': 3600,
1021 'records': [
1022 {
1023 "content": "10 mail@mx.example.org.",
1024 "disabled": False
1025 }
1026 ]
1027 }
1028 payload = {'rrsets': [rrset]}
1029 r = self.session.patch(
1030 self.url("/api/v1/servers/localhost/zones/" + name),
1031 data=json.dumps(payload),
1032 headers={'content-type': 'application/json'})
1033 self.assertEquals(r.status_code, 422)
1034 self.assertIn('non-hostname content', r.json()['error'])
1035 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1036 self.assertIsNone(get_rrset(data, name, 'MX'))
1037
1038 def test_zone_rr_update_opt(self):
1039 name, payload, zone = self.create_zone()
1040 # do a replace (= update)
1041 rrset = {
1042 'changetype': 'replace',
1043 'name': name,
1044 'type': 'OPT',
1045 'ttl': 3600,
1046 'records': [
1047 {
1048 "content": "9",
1049 "disabled": False
1050 }
1051 ]
1052 }
1053 payload = {'rrsets': [rrset]}
1054 r = self.session.patch(
1055 self.url("/api/v1/servers/localhost/zones/" + name),
1056 data=json.dumps(payload),
1057 headers={'content-type': 'application/json'})
1058 self.assertEquals(r.status_code, 422)
1059 self.assertIn('OPT: invalid type given', r.json()['error'])
1060
1061 def test_zone_rr_update_multiple_rrsets(self):
1062 name, payload, zone = self.create_zone()
1063 rrset1 = {
1064 'changetype': 'replace',
1065 'name': name,
1066 'type': 'NS',
1067 'ttl': 3600,
1068 'records': [
1069 {
1070
1071 "content": "ns9999.example.com.",
1072 "disabled": False
1073 }
1074 ]
1075 }
1076 rrset2 = {
1077 'changetype': 'replace',
1078 'name': name,
1079 'type': 'MX',
1080 'ttl': 3600,
1081 'records': [
1082 {
1083 "content": "10 mx444.example.com.",
1084 "disabled": False
1085 }
1086 ]
1087 }
1088 payload = {'rrsets': [rrset1, rrset2]}
1089 r = self.session.patch(
1090 self.url("/api/v1/servers/localhost/zones/" + name),
1091 data=json.dumps(payload),
1092 headers={'content-type': 'application/json'})
1093 self.assert_success(r)
1094 # verify that all rrsets have been updated
1095 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1096 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset1['records'])
1097 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset2['records'])
1098
1099 def test_zone_rr_update_duplicate_record(self):
1100 name, payload, zone = self.create_zone()
1101 rrset = {
1102 'changetype': 'replace',
1103 'name': name,
1104 'type': 'NS',
1105 'ttl': 3600,
1106 'records': [
1107 {"content": "ns9999.example.com.", "disabled": False},
1108 {"content": "ns9996.example.com.", "disabled": False},
1109 {"content": "ns9987.example.com.", "disabled": False},
1110 {"content": "ns9988.example.com.", "disabled": False},
1111 {"content": "ns9999.example.com.", "disabled": False},
1112 ]
1113 }
1114 payload = {'rrsets': [rrset]}
1115 r = self.session.patch(
1116 self.url("/api/v1/servers/localhost/zones/" + name),
1117 data=json.dumps(payload),
1118 headers={'content-type': 'application/json'})
1119 self.assertEquals(r.status_code, 422)
1120 self.assertIn('Duplicate record in RRset', r.json()['error'])
1121
1122 def test_zone_rr_update_duplicate_rrset(self):
1123 name, payload, zone = self.create_zone()
1124 rrset1 = {
1125 'changetype': 'replace',
1126 'name': name,
1127 'type': 'NS',
1128 'ttl': 3600,
1129 'records': [
1130 {
1131 "content": "ns9999.example.com.",
1132 "disabled": False
1133 }
1134 ]
1135 }
1136 rrset2 = {
1137 'changetype': 'replace',
1138 'name': name,
1139 'type': 'NS',
1140 'ttl': 3600,
1141 'records': [
1142 {
1143 "content": "ns9998.example.com.",
1144 "disabled": False
1145 }
1146 ]
1147 }
1148 payload = {'rrsets': [rrset1, rrset2]}
1149 r = self.session.patch(
1150 self.url("/api/v1/servers/localhost/zones/" + name),
1151 data=json.dumps(payload),
1152 headers={'content-type': 'application/json'})
1153 self.assertEquals(r.status_code, 422)
1154 self.assertIn('Duplicate RRset', r.json()['error'])
1155
1156 def test_zone_rr_delete(self):
1157 name, payload, zone = self.create_zone()
1158 # do a delete of all NS records (these are created with the zone)
1159 rrset = {
1160 'changetype': 'delete',
1161 'name': name,
1162 'type': 'NS'
1163 }
1164 payload = {'rrsets': [rrset]}
1165 r = self.session.patch(
1166 self.url("/api/v1/servers/localhost/zones/" + name),
1167 data=json.dumps(payload),
1168 headers={'content-type': 'application/json'})
1169 self.assert_success(r)
1170 # verify that the records are gone
1171 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1172 self.assertIsNone(get_rrset(data, name, 'NS'))
1173
1174 def test_zone_disable_reenable(self):
1175 # This also tests that SOA-EDIT-API works.
1176 name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
1177 # disable zone by disabling SOA
1178 rrset = {
1179 'changetype': 'replace',
1180 'name': name,
1181 'type': 'SOA',
1182 'ttl': 3600,
1183 'records': [
1184 {
1185 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1186 "disabled": True
1187 }
1188 ]
1189 }
1190 payload = {'rrsets': [rrset]}
1191 r = self.session.patch(
1192 self.url("/api/v1/servers/localhost/zones/" + name),
1193 data=json.dumps(payload),
1194 headers={'content-type': 'application/json'})
1195 self.assert_success(r)
1196 # check SOA serial has been edited
1197 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1198 soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1199 self.assertNotEquals(soa_serial1, '1')
1200 # make sure domain is still in zone list (disabled SOA!)
1201 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
1202 domains = r.json()
1203 self.assertEquals(len([domain for domain in domains if domain['name'] == name]), 1)
1204 # sleep 1sec to ensure the EPOCH value changes for the next request
1205 time.sleep(1)
1206 # verify that modifying it still works
1207 rrset['records'][0]['disabled'] = False
1208 payload = {'rrsets': [rrset]}
1209 r = self.session.patch(
1210 self.url("/api/v1/servers/localhost/zones/" + name),
1211 data=json.dumps(payload),
1212 headers={'content-type': 'application/json'})
1213 self.assert_success(r)
1214 # check SOA serial has been edited again
1215 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1216 soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1217 self.assertNotEquals(soa_serial2, '1')
1218 self.assertNotEquals(soa_serial2, soa_serial1)
1219
1220 def test_zone_rr_update_out_of_zone(self):
1221 name, payload, zone = self.create_zone()
1222 # replace with qname mismatch
1223 rrset = {
1224 'changetype': 'replace',
1225 'name': 'not-in-zone.',
1226 'type': 'NS',
1227 'ttl': 3600,
1228 'records': [
1229 {
1230 "content": "ns1.bar.com.",
1231 "disabled": False
1232 }
1233 ]
1234 }
1235 payload = {'rrsets': [rrset]}
1236 r = self.session.patch(
1237 self.url("/api/v1/servers/localhost/zones/" + name),
1238 data=json.dumps(payload),
1239 headers={'content-type': 'application/json'})
1240 self.assertEquals(r.status_code, 422)
1241 self.assertIn('out of zone', r.json()['error'])
1242
1243 def test_zone_rr_update_restricted_chars(self):
1244 name, payload, zone = self.create_zone()
1245 # replace with qname mismatch
1246 rrset = {
1247 'changetype': 'replace',
1248 'name': 'test:' + name,
1249 'type': 'NS',
1250 'ttl': 3600,
1251 'records': [
1252 {
1253 "content": "ns1.bar.com.",
1254 "disabled": False
1255 }
1256 ]
1257 }
1258 payload = {'rrsets': [rrset]}
1259 r = self.session.patch(
1260 self.url("/api/v1/servers/localhost/zones/" + name),
1261 data=json.dumps(payload),
1262 headers={'content-type': 'application/json'})
1263 self.assertEquals(r.status_code, 422)
1264 self.assertIn('contains unsupported characters', r.json()['error'])
1265
1266 def test_rrset_unknown_type(self):
1267 name, payload, zone = self.create_zone()
1268 rrset = {
1269 'changetype': 'replace',
1270 'name': name,
1271 'type': 'FAFAFA',
1272 'ttl': 3600,
1273 'records': [
1274 {
1275 "content": "4.3.2.1",
1276 "disabled": False
1277 }
1278 ]
1279 }
1280 payload = {'rrsets': [rrset]}
1281 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1282 headers={'content-type': 'application/json'})
1283 self.assertEquals(r.status_code, 422)
1284 self.assertIn('unknown type', r.json()['error'])
1285
1286 @parameterized.expand([
1287 ('CNAME', ),
1288 ('DNAME', ),
1289 ])
1290 def test_rrset_exclusive_and_other(self, qtype):
1291 name, payload, zone = self.create_zone()
1292 rrset = {
1293 'changetype': 'replace',
1294 'name': name,
1295 'type': qtype,
1296 'ttl': 3600,
1297 'records': [
1298 {
1299 "content": "example.org.",
1300 "disabled": False
1301 }
1302 ]
1303 }
1304 payload = {'rrsets': [rrset]}
1305 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1306 headers={'content-type': 'application/json'})
1307 self.assertEquals(r.status_code, 422)
1308 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1309
1310 @parameterized.expand([
1311 ('CNAME', ),
1312 ('DNAME', ),
1313 ])
1314 def test_rrset_other_and_exclusive(self, qtype):
1315 name, payload, zone = self.create_zone()
1316 rrset = {
1317 'changetype': 'replace',
1318 'name': 'sub.'+name,
1319 'type': qtype,
1320 'ttl': 3600,
1321 'records': [
1322 {
1323 "content": "example.org.",
1324 "disabled": False
1325 }
1326 ]
1327 }
1328 payload = {'rrsets': [rrset]}
1329 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1330 headers={'content-type': 'application/json'})
1331 self.assert_success(r)
1332 rrset = {
1333 'changetype': 'replace',
1334 'name': 'sub.'+name,
1335 'type': 'A',
1336 'ttl': 3600,
1337 'records': [
1338 {
1339 "content": "1.2.3.4",
1340 "disabled": False
1341 }
1342 ]
1343 }
1344 payload = {'rrsets': [rrset]}
1345 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1346 headers={'content-type': 'application/json'})
1347 self.assertEquals(r.status_code, 422)
1348 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1349
1350 @parameterized.expand([
1351 ('SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
1352 ('CNAME', ['01.example.org.', '02.example.org.']),
1353 ('DNAME', ['01.example.org.', '02.example.org.']),
1354 ])
1355 def test_rrset_single_qtypes(self, qtype, contents):
1356 name, payload, zone = self.create_zone()
1357 rrset = {
1358 'changetype': 'replace',
1359 'name': 'sub.'+name,
1360 'type': qtype,
1361 'ttl': 3600,
1362 'records': [
1363 {
1364 "content": contents[0],
1365 "disabled": False
1366 },
1367 {
1368 "content": contents[1],
1369 "disabled": False
1370 }
1371 ]
1372 }
1373 payload = {'rrsets': [rrset]}
1374 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1375 headers={'content-type': 'application/json'})
1376 self.assertEquals(r.status_code, 422)
1377 self.assertIn('IN ' + qtype + ' has more than one record', r.json()['error'])
1378
1379 def test_create_zone_with_leading_space(self):
1380 # Actual regression.
1381 name, payload, zone = self.create_zone()
1382 rrset = {
1383 'changetype': 'replace',
1384 'name': name,
1385 'type': 'A',
1386 'ttl': 3600,
1387 'records': [
1388 {
1389 "content": " 4.3.2.1",
1390 "disabled": False
1391 }
1392 ]
1393 }
1394 payload = {'rrsets': [rrset]}
1395 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1396 headers={'content-type': 'application/json'})
1397 self.assertEquals(r.status_code, 422)
1398 self.assertIn('Not in expected format', r.json()['error'])
1399
1400 def test_zone_rr_delete_out_of_zone(self):
1401 name, payload, zone = self.create_zone()
1402 rrset = {
1403 'changetype': 'delete',
1404 'name': 'not-in-zone.',
1405 'type': 'NS'
1406 }
1407 payload = {'rrsets': [rrset]}
1408 r = self.session.patch(
1409 self.url("/api/v1/servers/localhost/zones/" + name),
1410 data=json.dumps(payload),
1411 headers={'content-type': 'application/json'})
1412 print(r.content)
1413 self.assert_success(r) # succeed so users can fix their wrong, old data
1414
1415 def test_zone_delete(self):
1416 name, payload, zone = self.create_zone()
1417 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1418 self.assertEquals(r.status_code, 204)
1419 self.assertNotIn('Content-Type', r.headers)
1420
1421 def test_zone_comment_create(self):
1422 name, payload, zone = self.create_zone()
1423 rrset = {
1424 'changetype': 'replace',
1425 'name': name,
1426 'type': 'NS',
1427 'ttl': 3600,
1428 'comments': [
1429 {
1430 'account': 'test1',
1431 'content': 'blah blah',
1432 },
1433 {
1434 'account': 'test2',
1435 'content': 'blah blah bleh',
1436 }
1437 ]
1438 }
1439 payload = {'rrsets': [rrset]}
1440 r = self.session.patch(
1441 self.url("/api/v1/servers/localhost/zones/" + name),
1442 data=json.dumps(payload),
1443 headers={'content-type': 'application/json'})
1444 self.assert_success(r)
1445 # make sure the comments have been set, and that the NS
1446 # records are still present
1447 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1448 serverset = get_rrset(data, name, 'NS')
1449 print(serverset)
1450 self.assertNotEquals(serverset['records'], [])
1451 self.assertNotEquals(serverset['comments'], [])
1452 # verify that modified_at has been set by pdns
1453 self.assertNotEquals([c for c in serverset['comments']][0]['modified_at'], 0)
1454 # verify that TTL is correct (regression test)
1455 self.assertEquals(serverset['ttl'], 3600)
1456
1457 def test_zone_comment_delete(self):
1458 # Test: Delete ONLY comments.
1459 name, payload, zone = self.create_zone()
1460 rrset = {
1461 'changetype': 'replace',
1462 'name': name,
1463 'type': 'NS',
1464 'comments': []
1465 }
1466 payload = {'rrsets': [rrset]}
1467 r = self.session.patch(
1468 self.url("/api/v1/servers/localhost/zones/" + name),
1469 data=json.dumps(payload),
1470 headers={'content-type': 'application/json'})
1471 self.assert_success(r)
1472 # make sure the NS records are still present
1473 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1474 serverset = get_rrset(data, name, 'NS')
1475 print(serverset)
1476 self.assertNotEquals(serverset['records'], [])
1477 self.assertEquals(serverset['comments'], [])
1478
1479 def test_zone_comment_out_of_range_modified_at(self):
1480 # Test if comments on an rrset stay intact if the rrset is replaced
1481 name, payload, zone = self.create_zone()
1482 rrset = {
1483 'changetype': 'replace',
1484 'name': name,
1485 'type': 'NS',
1486 'comments': [
1487 {
1488 'account': 'test1',
1489 'content': 'oh hi there',
1490 'modified_at': '4294967297'
1491 }
1492 ]
1493 }
1494 payload = {'rrsets': [rrset]}
1495 r = self.session.patch(
1496 self.url("/api/v1/servers/localhost/zones/" + name),
1497 data=json.dumps(payload),
1498 headers={'content-type': 'application/json'})
1499 self.assertEquals(r.status_code, 422)
1500 self.assertIn("Value for key 'modified_at' is out of range", r.json()['error'])
1501
1502 def test_zone_comment_stay_intact(self):
1503 # Test if comments on an rrset stay intact if the rrset is replaced
1504 name, payload, zone = self.create_zone()
1505 # create a comment
1506 rrset = {
1507 'changetype': 'replace',
1508 'name': name,
1509 'type': 'NS',
1510 'comments': [
1511 {
1512 'account': 'test1',
1513 'content': 'oh hi there',
1514 'modified_at': 1111
1515 }
1516 ]
1517 }
1518 payload = {'rrsets': [rrset]}
1519 r = self.session.patch(
1520 self.url("/api/v1/servers/localhost/zones/" + name),
1521 data=json.dumps(payload),
1522 headers={'content-type': 'application/json'})
1523 self.assert_success(r)
1524 # replace rrset records
1525 rrset2 = {
1526 'changetype': 'replace',
1527 'name': name,
1528 'type': 'NS',
1529 'ttl': 3600,
1530 'records': [
1531 {
1532 "content": "ns1.bar.com.",
1533 "disabled": False
1534 }
1535 ]
1536 }
1537 payload2 = {'rrsets': [rrset2]}
1538 r = self.session.patch(
1539 self.url("/api/v1/servers/localhost/zones/" + name),
1540 data=json.dumps(payload2),
1541 headers={'content-type': 'application/json'})
1542 self.assert_success(r)
1543 # make sure the comments still exist
1544 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1545 serverset = get_rrset(data, name, 'NS')
1546 print(serverset)
1547 self.assertEquals(serverset['records'], rrset2['records'])
1548 self.assertEquals(serverset['comments'], rrset['comments'])
1549
1550 def test_zone_auto_ptr_ipv4_create(self):
1551 revzone = '4.2.192.in-addr.arpa.'
1552 _, _, revzonedata = self.create_zone(name=revzone)
1553 name = unique_zone_name()
1554 rrset = {
1555 "name": name,
1556 "type": "A",
1557 "ttl": 3600,
1558 "records": [{
1559 "content": "192.2.4.44",
1560 "disabled": False,
1561 "set-ptr": True,
1562 }],
1563 }
1564 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
1565 del rrset['records'][0]['set-ptr']
1566 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
1567 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1568 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1569 print(revsets)
1570 self.assertEquals(revsets, [{
1571 u'name': u'44.4.2.192.in-addr.arpa.',
1572 u'ttl': 3600,
1573 u'type': u'PTR',
1574 u'comments': [],
1575 u'records': [{
1576 u'content': name,
1577 u'disabled': False,
1578 }],
1579 }])
1580 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1581 self.assertGreater(r['serial'], revzonedata['serial'])
1582
1583 def test_zone_auto_ptr_ipv4_update(self):
1584 revzone = '0.2.192.in-addr.arpa.'
1585 _, _, revzonedata = self.create_zone(name=revzone)
1586 name, payload, zone = self.create_zone()
1587 rrset = {
1588 'changetype': 'replace',
1589 'name': name,
1590 'type': 'A',
1591 'ttl': 3600,
1592 'records': [
1593 {
1594 "content": '192.2.0.2',
1595 "disabled": False,
1596 "set-ptr": True
1597 }
1598 ]
1599 }
1600 payload = {'rrsets': [rrset]}
1601 r = self.session.patch(
1602 self.url("/api/v1/servers/localhost/zones/" + name),
1603 data=json.dumps(payload),
1604 headers={'content-type': 'application/json'})
1605 self.assert_success(r)
1606 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1607 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1608 print(revsets)
1609 self.assertEquals(revsets, [{
1610 u'name': u'2.0.2.192.in-addr.arpa.',
1611 u'ttl': 3600,
1612 u'type': u'PTR',
1613 u'comments': [],
1614 u'records': [{
1615 u'content': name,
1616 u'disabled': False,
1617 }],
1618 }])
1619 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1620 self.assertGreater(r['serial'], revzonedata['serial'])
1621
1622 def test_zone_auto_ptr_ipv6_update(self):
1623 # 2001:DB8::bb:aa
1624 revzone = '8.b.d.0.1.0.0.2.ip6.arpa.'
1625 _, _, revzonedata = self.create_zone(name=revzone)
1626 name, payload, zone = self.create_zone()
1627 rrset = {
1628 'changetype': 'replace',
1629 'name': name,
1630 'type': 'AAAA',
1631 'ttl': 3600,
1632 'records': [
1633 {
1634 "content": '2001:DB8::bb:aa',
1635 "disabled": False,
1636 "set-ptr": True
1637 }
1638 ]
1639 }
1640 payload = {'rrsets': [rrset]}
1641 r = self.session.patch(
1642 self.url("/api/v1/servers/localhost/zones/" + name),
1643 data=json.dumps(payload),
1644 headers={'content-type': 'application/json'})
1645 self.assert_success(r)
1646 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1647 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1648 print(revsets)
1649 self.assertEquals(revsets, [{
1650 u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1651 u'ttl': 3600,
1652 u'type': u'PTR',
1653 u'comments': [],
1654 u'records': [{
1655 u'content': name,
1656 u'disabled': False,
1657 }],
1658 }])
1659 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1660 self.assertGreater(r['serial'], revzonedata['serial'])
1661
1662 def test_search_rr_exact_zone(self):
1663 name = unique_zone_name()
1664 self.create_zone(name=name, serial=22, soa_edit_api='')
1665 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
1666 self.assert_success_json(r)
1667 print(r.json())
1668 self.assertEquals(r.json(), [
1669 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1670 {u'content': u'ns1.example.com.',
1671 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1672 u'ttl': 3600, u'type': u'NS', u'name': name},
1673 {u'content': u'ns2.example.com.',
1674 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1675 u'ttl': 3600, u'type': u'NS', u'name': name},
1676 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1677 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1678 u'ttl': 3600, u'type': u'SOA', u'name': name},
1679 ])
1680
1681 def test_search_rr_exact_zone_filter_type_zone(self):
1682 name = unique_zone_name()
1683 data_type = "zone"
1684 self.create_zone(name=name, serial=22, soa_edit_api='')
1685 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
1686 self.assert_success_json(r)
1687 print(r.json())
1688 self.assertEquals(r.json(), [
1689 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1690 ])
1691
1692 def test_search_rr_exact_zone_filter_type_record(self):
1693 name = unique_zone_name()
1694 data_type = "record"
1695 self.create_zone(name=name, serial=22, soa_edit_api='')
1696 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
1697 self.assert_success_json(r)
1698 print(r.json())
1699 self.assertEquals(r.json(), [
1700 {u'content': u'ns1.example.com.',
1701 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1702 u'ttl': 3600, u'type': u'NS', u'name': name},
1703 {u'content': u'ns2.example.com.',
1704 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1705 u'ttl': 3600, u'type': u'NS', u'name': name},
1706 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1707 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1708 u'ttl': 3600, u'type': u'SOA', u'name': name},
1709 ])
1710
1711 def test_search_rr_substring(self):
1712 name = unique_zone_name()
1713 search = name[5:-5]
1714 self.create_zone(name=name)
1715 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1716 self.assert_success_json(r)
1717 print(r.json())
1718 # should return zone, SOA, ns1, ns2
1719 self.assertEquals(len(r.json()), 4)
1720
1721 def test_search_rr_case_insensitive(self):
1722 name = unique_zone_name()+'testsuffix.'
1723 self.create_zone(name=name)
1724 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1725 self.assert_success_json(r)
1726 print(r.json())
1727 # should return zone, SOA, ns1, ns2
1728 self.assertEquals(len(r.json()), 4)
1729
1730 def test_search_after_rectify_with_ent(self):
1731 name = unique_zone_name()
1732 search = name.split('.')[0]
1733 rrset = {
1734 "name": 'sub.sub.' + name,
1735 "type": "A",
1736 "ttl": 3600,
1737 "records": [{
1738 "content": "4.3.2.1",
1739 "disabled": False,
1740 }],
1741 }
1742 self.create_zone(name=name, rrsets=[rrset])
1743 pdnsutil_rectify(name)
1744 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1745 self.assert_success_json(r)
1746 print(r.json())
1747 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1748 self.assertEquals(len(r.json()), 5)
1749
1750 def test_default_api_rectify(self):
1751 name = unique_zone_name()
1752 search = name.split('.')[0]
1753 rrsets = [
1754 {
1755 "name": 'a.' + name,
1756 "type": "AAAA",
1757 "ttl": 3600,
1758 "records": [{
1759 "content": "2001:DB8::1",
1760 "disabled": False,
1761 }],
1762 },
1763 {
1764 "name": 'b.' + name,
1765 "type": "AAAA",
1766 "ttl": 3600,
1767 "records": [{
1768 "content": "2001:DB8::2",
1769 "disabled": False,
1770 }],
1771 },
1772 ]
1773 self.create_zone(name=name, rrsets=rrsets, dnssec=True, nsec3param='1 0 1 ab')
1774 dbrecs = get_db_records(name, 'AAAA')
1775 self.assertIsNotNone(dbrecs[0]['ordername'])
1776
1777 def test_override_api_rectify(self):
1778 name = unique_zone_name()
1779 search = name.split('.')[0]
1780 rrsets = [
1781 {
1782 "name": 'a.' + name,
1783 "type": "AAAA",
1784 "ttl": 3600,
1785 "records": [{
1786 "content": "2001:DB8::1",
1787 "disabled": False,
1788 }],
1789 },
1790 {
1791 "name": 'b.' + name,
1792 "type": "AAAA",
1793 "ttl": 3600,
1794 "records": [{
1795 "content": "2001:DB8::2",
1796 "disabled": False,
1797 }],
1798 },
1799 ]
1800 self.create_zone(name=name, rrsets=rrsets, api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
1801 dbrecs = get_db_records(name, 'AAAA')
1802 self.assertIsNone(dbrecs[0]['ordername'])
1803
1804 def test_cname_at_ent_place(self):
1805 name, payload, zone = self.create_zone(dnssec=True, api_rectify=True)
1806 rrset = {
1807 'changetype': 'replace',
1808 'name': 'sub2.sub1.' + name,
1809 'type': "A",
1810 'ttl': 3600,
1811 'records': [{
1812 'content': "4.3.2.1",
1813 'disabled': False,
1814 }],
1815 }
1816 payload = {'rrsets': [rrset]}
1817 r = self.session.patch(
1818 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1819 data=json.dumps(payload),
1820 headers={'content-type': 'application/json'})
1821 self.assertEquals(r.status_code, 204)
1822 rrset = {
1823 'changetype': 'replace',
1824 'name': 'sub1.' + name,
1825 'type': "CNAME",
1826 'ttl': 3600,
1827 'records': [{
1828 'content': "www.example.org.",
1829 'disabled': False,
1830 }],
1831 }
1832 payload = {'rrsets': [rrset]}
1833 r = self.session.patch(
1834 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1835 data=json.dumps(payload),
1836 headers={'content-type': 'application/json'})
1837 self.assertEquals(r.status_code, 204)
1838
1839 def test_rrset_parameter_post_false(self):
1840 name = unique_zone_name()
1841 payload = {
1842 'name': name,
1843 'kind': 'Native',
1844 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1845 }
1846 r = self.session.post(
1847 self.url("/api/v1/servers/localhost/zones?rrsets=false"),
1848 data=json.dumps(payload),
1849 headers={'content-type': 'application/json'})
1850 print(r.json())
1851 self.assert_success_json(r)
1852 self.assertEquals(r.status_code, 201)
1853 self.assertEquals(r.json().get('rrsets'), None)
1854
1855 def test_rrset_false_parameter(self):
1856 name = unique_zone_name()
1857 self.create_zone(name=name, kind='Native')
1858 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
1859 self.assert_success_json(r)
1860 print(r.json())
1861 self.assertEquals(r.json().get('rrsets'), None)
1862
1863 def test_rrset_true_parameter(self):
1864 name = unique_zone_name()
1865 self.create_zone(name=name, kind='Native')
1866 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
1867 self.assert_success_json(r)
1868 print(r.json())
1869 self.assertEquals(len(r.json().get('rrsets')), 2)
1870
1871 def test_wrong_rrset_parameter(self):
1872 name = unique_zone_name()
1873 self.create_zone(name=name, kind='Native')
1874 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar"))
1875 self.assertEquals(r.status_code, 422)
1876 self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error'])
1877
1878 def test_put_master_tsig_key_ids_non_existent(self):
1879 name = unique_zone_name()
1880 keyname = unique_zone_name().split('.')[0]
1881 self.create_zone(name=name, kind='Native')
1882 payload = {
1883 'master_tsig_key_ids': [keyname]
1884 }
1885 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
1886 data=json.dumps(payload),
1887 headers={'content-type': 'application/json'})
1888 self.assertEquals(r.status_code, 422)
1889 self.assertIn('A TSIG key with the name', r.json()['error'])
1890
1891 def test_put_slave_tsig_key_ids_non_existent(self):
1892 name = unique_zone_name()
1893 keyname = unique_zone_name().split('.')[0]
1894 self.create_zone(name=name, kind='Native')
1895 payload = {
1896 'slave_tsig_key_ids': [keyname]
1897 }
1898 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
1899 data=json.dumps(payload),
1900 headers={'content-type': 'application/json'})
1901 self.assertEquals(r.status_code, 422)
1902 self.assertIn('A TSIG key with the name', r.json()['error'])
1903
1904
1905 @unittest.skipIf(not is_auth(), "Not applicable")
1906 class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
1907
1908 def setUp(self):
1909 super(AuthRootZone, self).setUp()
1910 # zone name is not unique, so delete the zone before each individual test.
1911 self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
1912
1913 def test_create_zone(self):
1914 name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
1915 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
1916 self.assertIn(k, data)
1917 if k in payload:
1918 self.assertEquals(data[k], payload[k])
1919 # validate generated SOA
1920 rec = get_first_rec(data, '.', 'SOA')
1921 self.assertEquals(
1922 rec['content'],
1923 "a.misconfigured.powerdns.server. hostmaster. " + str(payload['serial']) +
1924 " 10800 3600 604800 3600"
1925 )
1926 # Regression test: verify zone list works
1927 zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
1928 print("zonelist:", zonelist)
1929 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
1930 # Also test that fetching the zone works.
1931 print("id:", data['id'])
1932 self.assertEquals(data['id'], '=2E')
1933 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
1934 print("zone (fetched):", data)
1935 for k in ('name', 'kind'):
1936 self.assertIn(k, data)
1937 self.assertEquals(data[k], payload[k])
1938 self.assertEqual(data['rrsets'][0]['name'], '.')
1939
1940 def test_update_zone(self):
1941 name, payload, zone = self.create_zone(name='.')
1942 zone_id = '=2E'
1943 # update, set as Master and enable SOA-EDIT-API
1944 payload = {
1945 'kind': 'Master',
1946 'masters': ['192.0.2.1', '192.0.2.2'],
1947 'soa_edit_api': 'EPOCH',
1948 'soa_edit': 'EPOCH'
1949 }
1950 r = self.session.put(
1951 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1952 data=json.dumps(payload),
1953 headers={'content-type': 'application/json'})
1954 self.assert_success(r)
1955 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1956 for k in payload.keys():
1957 self.assertIn(k, data)
1958 self.assertEquals(data[k], payload[k])
1959 # update, back to Native and empty(off)
1960 payload = {
1961 'kind': 'Native',
1962 'soa_edit_api': '',
1963 'soa_edit': ''
1964 }
1965 r = self.session.put(
1966 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1967 data=json.dumps(payload),
1968 headers={'content-type': 'application/json'})
1969 self.assert_success(r)
1970 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1971 for k in payload.keys():
1972 self.assertIn(k, data)
1973 self.assertEquals(data[k], payload[k])
1974
1975
1976 @unittest.skipIf(not is_recursor(), "Not applicable")
1977 class RecursorZones(ApiTestCase):
1978
1979 def create_zone(self, name=None, kind=None, rd=False, servers=None):
1980 if name is None:
1981 name = unique_zone_name()
1982 if servers is None:
1983 servers = []
1984 payload = {
1985 'name': name,
1986 'kind': kind,
1987 'servers': servers,
1988 'recursion_desired': rd
1989 }
1990 r = self.session.post(
1991 self.url("/api/v1/servers/localhost/zones"),
1992 data=json.dumps(payload),
1993 headers={'content-type': 'application/json'})
1994 self.assert_success_json(r)
1995 return payload, r.json()
1996
1997 def test_create_auth_zone(self):
1998 payload, data = self.create_zone(kind='Native')
1999 for k in payload.keys():
2000 self.assertEquals(data[k], payload[k])
2001
2002 def test_create_zone_no_name(self):
2003 payload = {
2004 'name': '',
2005 'kind': 'Native',
2006 'servers': ['8.8.8.8'],
2007 'recursion_desired': False,
2008 }
2009 print(payload)
2010 r = self.session.post(
2011 self.url("/api/v1/servers/localhost/zones"),
2012 data=json.dumps(payload),
2013 headers={'content-type': 'application/json'})
2014 self.assertEquals(r.status_code, 422)
2015 self.assertIn('is not canonical', r.json()['error'])
2016
2017 def test_create_forwarded_zone(self):
2018 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
2019 # return values are normalized
2020 payload['servers'][0] += ':53'
2021 for k in payload.keys():
2022 self.assertEquals(data[k], payload[k])
2023
2024 def test_create_forwarded_rd_zone(self):
2025 payload, data = self.create_zone(name='google.com.', kind='Forwarded', rd=True, servers=['8.8.8.8'])
2026 # return values are normalized
2027 payload['servers'][0] += ':53'
2028 for k in payload.keys():
2029 self.assertEquals(data[k], payload[k])
2030
2031 def test_create_auth_zone_with_symbols(self):
2032 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
2033 expected_id = (payload['name'].replace('/', '=2F'))
2034 for k in payload.keys():
2035 self.assertEquals(data[k], payload[k])
2036 self.assertEquals(data['id'], expected_id)
2037
2038 def test_rename_auth_zone(self):
2039 payload, data = self.create_zone(kind='Native')
2040 name = payload['name']
2041 # now rename it
2042 payload = {
2043 'name': 'renamed-'+name,
2044 'kind': 'Native',
2045 'recursion_desired': False
2046 }
2047 r = self.session.put(
2048 self.url("/api/v1/servers/localhost/zones/" + name),
2049 data=json.dumps(payload),
2050 headers={'content-type': 'application/json'})
2051 self.assert_success(r)
2052 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + payload['name'])).json()
2053 for k in payload.keys():
2054 self.assertEquals(data[k], payload[k])
2055
2056 def test_zone_delete(self):
2057 payload, zone = self.create_zone(kind='Native')
2058 name = payload['name']
2059 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
2060 self.assertEquals(r.status_code, 204)
2061 self.assertNotIn('Content-Type', r.headers)
2062
2063 def test_search_rr_exact_zone(self):
2064 name = unique_zone_name()
2065 self.create_zone(name=name, kind='Native')
2066 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
2067 self.assert_success_json(r)
2068 print(r.json())
2069 self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
2070
2071 def test_search_rr_substring(self):
2072 name = 'search-rr-zone.name.'
2073 self.create_zone(name=name, kind='Native')
2074 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2075 self.assert_success_json(r)
2076 print(r.json())
2077 # should return zone, SOA
2078 self.assertEquals(len(r.json()), 2)
2079
2080 @unittest.skipIf(not is_auth(), "Not applicable")
2081 class AuthZoneKeys(ApiTestCase, AuthZonesHelperMixin):
2082
2083 def test_get_keys(self):
2084 r = self.session.get(
2085 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2086 self.assert_success_json(r)
2087 keys = r.json()
2088 self.assertGreater(len(keys), 0)
2089
2090 key0 = deepcopy(keys[0])
2091 del key0['dnskey']
2092 del key0['ds']
2093 expected = {
2094 u'algorithm': u'ECDSAP256SHA256',
2095 u'bits': 256,
2096 u'active': True,
2097 u'type': u'Cryptokey',
2098 u'keytype': u'csk',
2099 u'flags': 257,
2100 u'id': 1}
2101 self.assertEquals(key0, expected)
2102
2103 keydata = keys[0]['dnskey'].split()
2104 self.assertEqual(len(keydata), 4)