]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
Merge pull request #8474 from omoerbeek/auth-fix-logging-no-cache
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
1 from __future__ import print_function
2 import json
3 import time
4 import unittest
5 from copy import deepcopy
6 from parameterized import parameterized
7 from pprint import pprint
8 from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, get_db_records, pdnsutil_rectify
9
10
11 def get_rrset(data, qname, qtype):
12 for rrset in data['rrsets']:
13 if rrset['name'] == qname and rrset['type'] == qtype:
14 return rrset
15 return None
16
17
18 def get_first_rec(data, qname, qtype):
19 rrset = get_rrset(data, qname, qtype)
20 if rrset:
21 return rrset['records'][0]
22 return None
23
24
25 def eq_zone_rrsets(rrsets, expected):
26 data_got = {}
27 data_expected = {}
28 for type_, expected_records in expected.items():
29 type_ = str(type_)
30 data_got[type_] = set()
31 data_expected[type_] = set()
32 uses_name = any(['name' in expected_record for expected_record in expected_records])
33 # minify + convert received data
34 for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
35 print(rrset)
36 for r in rrset['records']:
37 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
38 # minify expected data
39 for r in expected_records:
40 data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
41
42 print("eq_zone_rrsets: got:")
43 pprint(data_got)
44 print("eq_zone_rrsets: expected:")
45 pprint(data_expected)
46
47 assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
48
49
50 class Zones(ApiTestCase):
51
52 def _test_list_zones(self, dnssec=True):
53 path = "/api/v1/servers/localhost/zones"
54 if not dnssec:
55 path = path + "?dnssec=false"
56 r = self.session.get(self.url(path))
57 self.assert_success_json(r)
58 domains = r.json()
59 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
60 self.assertEquals(len(example_com), 1)
61 example_com = example_com[0]
62 print(example_com)
63 required_fields = ['id', 'url', 'name', 'kind']
64 if is_auth():
65 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial', 'account']
66 if dnssec:
67 required_fields = required_fields = ['dnssec', 'edited_serial']
68 self.assertNotEquals(example_com['serial'], 0)
69 if not dnssec:
70 self.assertNotIn('dnssec', example_com)
71 elif is_recursor():
72 required_fields = required_fields + ['recursion_desired', 'servers']
73 for field in required_fields:
74 self.assertIn(field, example_com)
75
76 def test_list_zones_with_dnssec(self):
77 if is_auth():
78 self._test_list_zones(True)
79
80 def test_list_zones_without_dnssec(self):
81 self._test_list_zones(False)
82
83 class AuthZonesHelperMixin(object):
84 def create_zone(self, name=None, **kwargs):
85 if name is None:
86 name = unique_zone_name()
87 payload = {
88 'name': name,
89 'kind': 'Native',
90 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
91 }
92 for k, v in kwargs.items():
93 if v is None:
94 del payload[k]
95 else:
96 payload[k] = v
97 print("sending", payload)
98 r = self.session.post(
99 self.url("/api/v1/servers/localhost/zones"),
100 data=json.dumps(payload),
101 headers={'content-type': 'application/json'})
102 self.assert_success_json(r)
103 self.assertEquals(r.status_code, 201)
104 reply = r.json()
105 print("reply", reply)
106 return name, payload, reply
107
108
109 @unittest.skipIf(not is_auth(), "Not applicable")
110 class AuthZones(ApiTestCase, AuthZonesHelperMixin):
111
112 def test_create_zone(self):
113 # soa_edit_api has a default, override with empty for this test
114 name, payload, data = self.create_zone(serial=22, soa_edit_api='')
115 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'edited_serial', 'soa_edit_api', 'soa_edit', 'account'):
116 self.assertIn(k, data)
117 if k in payload:
118 self.assertEquals(data[k], payload[k])
119 # validate generated SOA
120 expected_soa = "a.misconfigured.powerdns.server. hostmaster." + name + " " + \
121 str(payload['serial']) + " 10800 3600 604800 3600"
122 self.assertEquals(
123 get_first_rec(data, name, 'SOA')['content'],
124 expected_soa
125 )
126 # Because we had confusion about dots, check that the DB is without dots.
127 dbrecs = get_db_records(name, 'SOA')
128 self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
129 self.assertNotEquals(data['serial'], data['edited_serial'])
130
131 def test_create_zone_with_soa_edit_api(self):
132 # soa_edit_api wins over serial
133 name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
134 for k in ('soa_edit_api', ):
135 self.assertIn(k, data)
136 if k in payload:
137 self.assertEquals(data[k], payload[k])
138 # generated EPOCH serial surely is > fixed serial we passed in
139 print(data)
140 self.assertGreater(data['serial'], payload['serial'])
141 soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
142 self.assertGreater(soa_serial, payload['serial'])
143 self.assertEquals(soa_serial, data['serial'])
144
145 def test_create_zone_with_account(self):
146 # soa_edit_api wins over serial
147 name, payload, data = self.create_zone(account='anaccount', serial=10)
148 print(data)
149 for k in ('account', ):
150 self.assertIn(k, data)
151 if k in payload:
152 self.assertEquals(data[k], payload[k])
153
154 def test_create_zone_default_soa_edit_api(self):
155 name, payload, data = self.create_zone()
156 print(data)
157 self.assertEquals(data['soa_edit_api'], 'DEFAULT')
158
159 def test_create_zone_exists(self):
160 name, payload, data = self.create_zone()
161 print(data)
162 payload = {
163 'name': name,
164 'kind': 'Native'
165 }
166 print(payload)
167 r = self.session.post(
168 self.url("/api/v1/servers/localhost/zones"),
169 data=json.dumps(payload),
170 headers={'content-type': 'application/json'})
171 self.assertEquals(r.status_code, 409) # Conflict - already exists
172
173 def test_create_zone_with_soa_edit(self):
174 name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
175 print(data)
176 self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT')
177 self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
178 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
179 # These particular settings lead to the first serial set to YYYYMMDD01.
180 self.assertEquals(soa_serial[-2:], '01')
181 rrset = {
182 'changetype': 'replace',
183 'name': name,
184 'type': 'A',
185 'ttl': 3600,
186 'records': [
187 {
188 "content": "127.0.0.1",
189 "disabled": False
190 }
191 ]
192 }
193 payload = {'rrsets': [rrset]}
194 self.session.patch(
195 self.url("/api/v1/servers/localhost/zones/" + data['id']),
196 data=json.dumps(payload),
197 headers={'content-type': 'application/json'})
198 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
199 data = r.json()
200 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
201 self.assertEquals(soa_serial[-2:], '02')
202
203 def test_create_zone_with_records(self):
204 name = unique_zone_name()
205 rrset = {
206 "name": name,
207 "type": "A",
208 "ttl": 3600,
209 "records": [{
210 "content": "4.3.2.1",
211 "disabled": False,
212 }],
213 }
214 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
215 # check our record has appeared
216 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
217
218 def test_create_zone_with_wildcard_records(self):
219 name = unique_zone_name()
220 rrset = {
221 "name": "*."+name,
222 "type": "A",
223 "ttl": 3600,
224 "records": [{
225 "content": "4.3.2.1",
226 "disabled": False,
227 }],
228 }
229 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
230 # check our record has appeared
231 self.assertEquals(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
232
233 def test_create_zone_with_comments(self):
234 name = unique_zone_name()
235 rrsets = [
236 {
237 "name": name,
238 "type": "soa", # test uppercasing of type, too.
239 "comments": [{
240 "account": "test1",
241 "content": "blah blah",
242 "modified_at": 11112,
243 }],
244 },
245 {
246 "name": name,
247 "type": "AAAA",
248 "ttl": 3600,
249 "records": [{
250 "content": "2001:DB8::1",
251 "disabled": False,
252 }],
253 "comments": [{
254 "account": "test AAAA",
255 "content": "blah blah AAAA",
256 "modified_at": 11112,
257 }],
258 },
259 {
260 "name": name,
261 "type": "TXT",
262 "ttl": 3600,
263 "records": [{
264 "content": "\"test TXT\"",
265 "disabled": False,
266 }],
267 },
268 {
269 "name": name,
270 "type": "A",
271 "ttl": 3600,
272 "records": [{
273 "content": "192.0.2.1",
274 "disabled": False,
275 }],
276 },
277 ]
278 name, payload, data = self.create_zone(name=name, rrsets=rrsets)
279 # NS records have been created
280 self.assertEquals(len(data['rrsets']), len(rrsets) + 1)
281 # check our comment has appeared
282 self.assertEquals(get_rrset(data, name, 'SOA')['comments'], rrsets[0]['comments'])
283 self.assertEquals(get_rrset(data, name, 'A')['comments'], [])
284 self.assertEquals(get_rrset(data, name, 'TXT')['comments'], [])
285 self.assertEquals(get_rrset(data, name, 'AAAA')['comments'], rrsets[1]['comments'])
286
287 def test_create_zone_uncanonical_nameservers(self):
288 name = unique_zone_name()
289 payload = {
290 'name': name,
291 'kind': 'Native',
292 'nameservers': ['uncanon.example.com']
293 }
294 print(payload)
295 r = self.session.post(
296 self.url("/api/v1/servers/localhost/zones"),
297 data=json.dumps(payload),
298 headers={'content-type': 'application/json'})
299 self.assertEquals(r.status_code, 422)
300 self.assertIn('Nameserver is not canonical', r.json()['error'])
301
302 def test_create_auth_zone_no_name(self):
303 name = unique_zone_name()
304 payload = {
305 'name': '',
306 'kind': 'Native',
307 }
308 print(payload)
309 r = self.session.post(
310 self.url("/api/v1/servers/localhost/zones"),
311 data=json.dumps(payload),
312 headers={'content-type': 'application/json'})
313 self.assertEquals(r.status_code, 422)
314 self.assertIn('is not canonical', r.json()['error'])
315
316 def test_create_zone_with_custom_soa(self):
317 name = unique_zone_name()
318 content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
319 rrset = {
320 "name": name,
321 "type": "soa", # test uppercasing of type, too.
322 "ttl": 3600,
323 "records": [{
324 "content": content,
325 "disabled": False,
326 }],
327 }
328 name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
329 self.assertEquals(get_rrset(data, name, 'SOA')['records'], rrset['records'])
330 dbrecs = get_db_records(name, 'SOA')
331 self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
332
333 def test_create_zone_double_dot(self):
334 name = 'test..' + unique_zone_name()
335 payload = {
336 'name': name,
337 'kind': 'Native',
338 'nameservers': ['ns1.example.com.']
339 }
340 print(payload)
341 r = self.session.post(
342 self.url("/api/v1/servers/localhost/zones"),
343 data=json.dumps(payload),
344 headers={'content-type': 'application/json'})
345 self.assertEquals(r.status_code, 422)
346 self.assertIn('Unable to parse DNS Name', r.json()['error'])
347
348 def test_create_zone_restricted_chars(self):
349 name = 'test:' + unique_zone_name() # : isn't good as a name.
350 payload = {
351 'name': name,
352 'kind': 'Native',
353 'nameservers': ['ns1.example.com']
354 }
355 print(payload)
356 r = self.session.post(
357 self.url("/api/v1/servers/localhost/zones"),
358 data=json.dumps(payload),
359 headers={'content-type': 'application/json'})
360 self.assertEquals(r.status_code, 422)
361 self.assertIn('contains unsupported characters', r.json()['error'])
362
363 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self):
364 name = unique_zone_name()
365 rrset = {
366 "name": name,
367 "type": "NS",
368 "ttl": 3600,
369 "records": [{
370 "content": "ns2.example.com.",
371 "disabled": False,
372 }],
373 }
374 payload = {
375 'name': name,
376 'kind': 'Native',
377 'nameservers': ['ns1.example.com.'],
378 'rrsets': [rrset],
379 }
380 print(payload)
381 r = self.session.post(
382 self.url("/api/v1/servers/localhost/zones"),
383 data=json.dumps(payload),
384 headers={'content-type': 'application/json'})
385 self.assertEquals(r.status_code, 422)
386 self.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r.json()['error'])
387
388 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self):
389 name = unique_zone_name()
390 rrset = {
391 "name": 'subzone.'+name,
392 "type": "NS",
393 "ttl": 3600,
394 "records": [{
395 "content": "ns2.example.com.",
396 "disabled": False,
397 }],
398 }
399 payload = {
400 'name': name,
401 'kind': 'Native',
402 'nameservers': ['ns1.example.com.'],
403 'rrsets': [rrset],
404 }
405 print(payload)
406 r = self.session.post(
407 self.url("/api/v1/servers/localhost/zones"),
408 data=json.dumps(payload),
409 headers={'content-type': 'application/json'})
410 self.assert_success_json(r)
411
412 def test_create_zone_with_symbols(self):
413 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
414 name = payload['name']
415 expected_id = name.replace('/', '=2F')
416 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
417 self.assertIn(k, data)
418 if k in payload:
419 self.assertEquals(data[k], payload[k])
420 self.assertEquals(data['id'], expected_id)
421 dbrecs = get_db_records(name, 'SOA')
422 self.assertEqual(dbrecs[0]['name'], name.rstrip('.'))
423
424 def test_create_zone_with_nameservers_non_string(self):
425 # ensure we don't crash
426 name = unique_zone_name()
427 payload = {
428 'name': name,
429 'kind': 'Native',
430 'nameservers': [{'a': 'ns1.example.com'}] # invalid
431 }
432 print(payload)
433 r = self.session.post(
434 self.url("/api/v1/servers/localhost/zones"),
435 data=json.dumps(payload),
436 headers={'content-type': 'application/json'})
437 self.assertEquals(r.status_code, 422)
438
439 def test_create_zone_with_dnssec(self):
440 """
441 Create a zone with "dnssec" set and see if a key was made.
442 """
443 name = unique_zone_name()
444 name, payload, data = self.create_zone(dnssec=True)
445
446 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
447
448 for k in ('dnssec', ):
449 self.assertIn(k, data)
450 if k in payload:
451 self.assertEquals(data[k], payload[k])
452
453 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
454
455 keys = r.json()
456
457 print(keys)
458
459 self.assertEquals(r.status_code, 200)
460 self.assertEquals(len(keys), 1)
461 self.assertEquals(keys[0]['type'], 'Cryptokey')
462 self.assertEquals(keys[0]['active'], True)
463 self.assertEquals(keys[0]['keytype'], 'csk')
464
465 def test_create_zone_with_dnssec_disable_dnssec(self):
466 """
467 Create a zone with "dnssec", then set "dnssec" to false and see if the
468 keys are gone
469 """
470 name = unique_zone_name()
471 name, payload, data = self.create_zone(dnssec=True)
472
473 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
474 data=json.dumps({'dnssec': False}))
475 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
476
477 zoneinfo = r.json()
478
479 self.assertEquals(r.status_code, 200)
480 self.assertEquals(zoneinfo['dnssec'], False)
481
482 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
483
484 keys = r.json()
485
486 self.assertEquals(r.status_code, 200)
487 self.assertEquals(len(keys), 0)
488
489 def test_create_zone_with_nsec3param(self):
490 """
491 Create a zone with "nsec3param" set and see if the metadata was added.
492 """
493 name = unique_zone_name()
494 nsec3param = '1 0 500 aabbccddeeff'
495 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
496
497 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
498
499 for k in ('dnssec', 'nsec3param'):
500 self.assertIn(k, data)
501 if k in payload:
502 self.assertEquals(data[k], payload[k])
503
504 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3PARAM'))
505
506 data = r.json()
507
508 print(data)
509
510 self.assertEquals(r.status_code, 200)
511 self.assertEquals(len(data['metadata']), 1)
512 self.assertEquals(data['kind'], 'NSEC3PARAM')
513 self.assertEquals(data['metadata'][0], nsec3param)
514
515 def test_create_zone_with_nsec3narrow(self):
516 """
517 Create a zone with "nsec3narrow" set and see if the metadata was added.
518 """
519 name = unique_zone_name()
520 nsec3param = '1 0 500 aabbccddeeff'
521 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
522 nsec3narrow=True)
523
524 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
525
526 for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
527 self.assertIn(k, data)
528 if k in payload:
529 self.assertEquals(data[k], payload[k])
530
531 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3NARROW'))
532
533 data = r.json()
534
535 print(data)
536
537 self.assertEquals(r.status_code, 200)
538 self.assertEquals(len(data['metadata']), 1)
539 self.assertEquals(data['kind'], 'NSEC3NARROW')
540 self.assertEquals(data['metadata'][0], '1')
541
542 def test_create_zone_dnssec_serial(self):
543 """
544 Create a zone set/unset "dnssec" and see if the serial was increased
545 after every step
546 """
547 name = unique_zone_name()
548 name, payload, data = self.create_zone()
549
550 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
551 self.assertEquals(soa_serial[-2:], '01')
552
553 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
554 data=json.dumps({'dnssec': True}))
555 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
556
557 data = r.json()
558 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
559
560 self.assertEquals(r.status_code, 200)
561 self.assertEquals(soa_serial[-2:], '02')
562
563 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
564 data=json.dumps({'dnssec': False}))
565 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
566
567 data = r.json()
568 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
569
570 self.assertEquals(r.status_code, 200)
571 self.assertEquals(soa_serial[-2:], '03')
572
573 def test_zone_absolute_url(self):
574 name, payload, data = self.create_zone()
575 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
576 rdata = r.json()
577 print(rdata[0])
578 self.assertTrue(rdata[0]['url'].startswith('/api/v'))
579
580 def test_create_zone_metadata(self):
581 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
582 r = self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
583 data=json.dumps(payload_metadata))
584 rdata = r.json()
585 self.assertEquals(r.status_code, 201)
586 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
587
588 def test_create_zone_metadata_kind(self):
589 payload_metadata = {"metadata": ["127.0.0.2"]}
590 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
591 data=json.dumps(payload_metadata))
592 rdata = r.json()
593 self.assertEquals(r.status_code, 200)
594 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
595
596 def test_create_protected_zone_metadata(self):
597 # test whether it prevents modification of certain kinds
598 for k in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
599 payload = {"metadata": ["FOO", "BAR"]}
600 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k),
601 data=json.dumps(payload))
602 self.assertEquals(r.status_code, 422)
603
604 def test_retrieve_zone_metadata(self):
605 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
606 self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
607 data=json.dumps(payload_metadata))
608 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata"))
609 rdata = r.json()
610 self.assertEquals(r.status_code, 200)
611 self.assertIn(payload_metadata, rdata)
612
613 def test_delete_zone_metadata(self):
614 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
615 self.assertEquals(r.status_code, 200)
616 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
617 rdata = r.json()
618 self.assertEquals(r.status_code, 200)
619 self.assertEquals(rdata["metadata"], [])
620
621 def test_create_external_zone_metadata(self):
622 payload_metadata = {"metadata": ["My very important message"]}
623 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
624 data=json.dumps(payload_metadata))
625 self.assertEquals(r.status_code, 200)
626 rdata = r.json()
627 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
628
629 def test_create_metadata_in_non_existent_zone(self):
630 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
631 r = self.session.post(self.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
632 data=json.dumps(payload_metadata))
633 self.assertEquals(r.status_code, 404)
634 # Note: errors should probably contain json (see #5988)
635 # self.assertIn('Could not find domain ', r.json()['error'])
636
637 def test_create_slave_zone(self):
638 # Test that nameservers can be absent for slave zones.
639 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
640 for k in ('name', 'masters', 'kind'):
641 self.assertIn(k, data)
642 self.assertEquals(data[k], payload[k])
643 print("payload:", payload)
644 print("data:", data)
645 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
646 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
647 zonelist = r.json()
648 print("zonelist:", zonelist)
649 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
650 # Also test that fetching the zone works.
651 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
652 data = r.json()
653 print("zone (fetched):", data)
654 for k in ('name', 'masters', 'kind'):
655 self.assertIn(k, data)
656 self.assertEquals(data[k], payload[k])
657 self.assertEqual(data['serial'], 0)
658 self.assertEqual(data['rrsets'], [])
659
660 def test_find_zone_by_name(self):
661 name = 'foo/' + unique_zone_name()
662 name, payload, data = self.create_zone(name=name)
663 r = self.session.get(self.url("/api/v1/servers/localhost/zones?zone=" + name))
664 data = r.json()
665 print(data)
666 self.assertEquals(data[0]['name'], name)
667
668 def test_delete_slave_zone(self):
669 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
670 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
671 r.raise_for_status()
672
673 def test_retrieve_slave_zone(self):
674 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
675 print("payload:", payload)
676 print("data:", data)
677 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
678 data = r.json()
679 print("status for axfr-retrieve:", data)
680 self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
681 '\' from master 127.0.0.2')
682
683 def test_notify_master_zone(self):
684 name, payload, data = self.create_zone(kind='Master')
685 print("payload:", payload)
686 print("data:", data)
687 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
688 data = r.json()
689 print("status for notify:", data)
690 self.assertEqual(data['result'], 'Notification queued')
691
692 def test_get_zone_with_symbols(self):
693 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
694 name = payload['name']
695 zone_id = (name.replace('/', '=2F'))
696 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
697 data = r.json()
698 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
699 self.assertIn(k, data)
700 if k in payload:
701 self.assertEquals(data[k], payload[k])
702
703 def test_get_zone(self):
704 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
705 domains = r.json()
706 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
707 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id']))
708 self.assert_success_json(r)
709 data = r.json()
710 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
711 self.assertIn(k, data)
712 self.assertEquals(data['name'], 'example.com.')
713
714 def test_import_zone_broken(self):
715 payload = {
716 'name': 'powerdns-broken.com',
717 'kind': 'Master',
718 'nameservers': [],
719 }
720 payload['zone'] = """
721 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
722 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
723 ;; WARNING: recursion requested but not available
724
725 ;; OPT PSEUDOSECTION:
726 ; EDNS: version: 0, flags:; udp: 1680
727 ;; QUESTION SECTION:
728 ;powerdns.com. IN SOA
729
730 ;; ANSWER SECTION:
731 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
732 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
733 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
734 powerdns-broken.com. 86400 IN A 82.94.213.34
735 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
736 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
737 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
738 """
739 r = self.session.post(
740 self.url("/api/v1/servers/localhost/zones"),
741 data=json.dumps(payload),
742 headers={'content-type': 'application/json'})
743 self.assertEquals(r.status_code, 422)
744
745 def test_import_zone_axfr_outofzone(self):
746 # Ensure we don't create out-of-zone records
747 payload = {
748 'name': unique_zone_name(),
749 'kind': 'Master',
750 'nameservers': [],
751 }
752 payload['zone'] = """
753 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
754 %NAME% 3600 IN NS powerdnssec2.ds9a.nl.
755 example.org. 3600 IN AAAA 2001:888:2000:1d::2
756 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
757 """.replace('%NAME%', payload['name'])
758 r = self.session.post(
759 self.url("/api/v1/servers/localhost/zones"),
760 data=json.dumps(payload),
761 headers={'content-type': 'application/json'})
762 self.assertEquals(r.status_code, 422)
763 self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
764
765 def test_import_zone_axfr(self):
766 payload = {
767 'name': 'powerdns.com.',
768 'kind': 'Master',
769 'nameservers': [],
770 'soa_edit_api': '', # turn off so exact SOA comparison works.
771 }
772 payload['zone'] = """
773 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
774 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
775 ;; WARNING: recursion requested but not available
776
777 ;; OPT PSEUDOSECTION:
778 ; EDNS: version: 0, flags:; udp: 1680
779 ;; QUESTION SECTION:
780 ;powerdns.com. IN SOA
781
782 ;; ANSWER SECTION:
783 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
784 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
785 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
786 powerdns.com. 86400 IN A 82.94.213.34
787 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
788 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
789 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
790 """
791 r = self.session.post(
792 self.url("/api/v1/servers/localhost/zones"),
793 data=json.dumps(payload),
794 headers={'content-type': 'application/json'})
795 self.assert_success_json(r)
796 data = r.json()
797 self.assertIn('name', data)
798
799 expected = {
800 'NS': [
801 {'content': 'powerdnssec1.ds9a.nl.'},
802 {'content': 'powerdnssec2.ds9a.nl.'},
803 ],
804 'SOA': [
805 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
806 ],
807 'MX': [
808 {'content': '0 xs.powerdns.com.'},
809 ],
810 'A': [
811 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
812 ],
813 'AAAA': [
814 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
815 ],
816 }
817
818 eq_zone_rrsets(data['rrsets'], expected)
819
820 # check content in DB is stored WITHOUT trailing dot.
821 dbrecs = get_db_records(payload['name'], 'NS')
822 dbrec = next((dbrec for dbrec in dbrecs if dbrec['content'].startswith('powerdnssec1')))
823 self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
824
825 def test_import_zone_bind(self):
826 payload = {
827 'name': 'example.org.',
828 'kind': 'Master',
829 'nameservers': [],
830 'soa_edit_api': '', # turn off so exact SOA comparison works.
831 }
832 payload['zone'] = """
833 $TTL 86400 ; 24 hours could have been written as 24h or 1d
834 ; $TTL used for all RRs without explicit TTL value
835 $ORIGIN example.org.
836 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
837 2002022401 ; serial
838 3H ; refresh
839 15 ; retry
840 1w ; expire
841 3h ; minimum
842 )
843 IN NS ns1.example.org. ; in the domain
844 IN NS ns2.smokeyjoe.com. ; external to domain
845 IN MX 10 mail.another.com. ; external mail provider
846 ; server host definitions
847 ns1 IN A 192.168.0.1 ;name server definition
848 www IN A 192.168.0.2 ;web server definition
849 ftp IN CNAME www.example.org. ;ftp server definition
850 ; non server domain hosts
851 bill IN A 192.168.0.3
852 fred IN A 192.168.0.4
853 """
854 r = self.session.post(
855 self.url("/api/v1/servers/localhost/zones"),
856 data=json.dumps(payload),
857 headers={'content-type': 'application/json'})
858 self.assert_success_json(r)
859 data = r.json()
860 self.assertIn('name', data)
861
862 expected = {
863 'NS': [
864 {'content': 'ns1.example.org.'},
865 {'content': 'ns2.smokeyjoe.com.'},
866 ],
867 'SOA': [
868 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
869 ],
870 'MX': [
871 {'content': '10 mail.another.com.'},
872 ],
873 'A': [
874 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
875 {'content': '192.168.0.2', 'name': 'www.example.org.'},
876 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
877 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
878 ],
879 'CNAME': [
880 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
881 ],
882 }
883
884 eq_zone_rrsets(data['rrsets'], expected)
885
886 def test_import_zone_bind_cname_apex(self):
887 payload = {
888 'name': unique_zone_name(),
889 'kind': 'Master',
890 'nameservers': [],
891 }
892 payload['zone'] = """
893 $ORIGIN %NAME%
894 @ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
895 @ IN NS ns1.example.org.
896 @ IN NS ns2.smokeyjoe.com.
897 @ IN CNAME www.example.org.
898 """.replace('%NAME%', payload['name'])
899 r = self.session.post(
900 self.url("/api/v1/servers/localhost/zones"),
901 data=json.dumps(payload),
902 headers={'content-type': 'application/json'})
903 self.assertEquals(r.status_code, 422)
904 self.assertIn('Conflicts with another RRset', r.json()['error'])
905
906 def test_export_zone_json(self):
907 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
908 # export it
909 r = self.session.get(
910 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
911 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
912 )
913 self.assert_success_json(r)
914 data = r.json()
915 self.assertIn('zone', data)
916 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
917 name + '\t3600\tIN\tNS\tns2.foo.com.',
918 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
919 ' 0 10800 3600 604800 3600']
920 self.assertEquals(data['zone'].strip().split('\n'), expected_data)
921
922 def test_export_zone_text(self):
923 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
924 # export it
925 r = self.session.get(
926 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
927 headers={'accept': '*/*'}
928 )
929 data = r.text.strip().split("\n")
930 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
931 name + '\t3600\tIN\tNS\tns2.foo.com.',
932 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
933 ' 0 10800 3600 604800 3600']
934 self.assertEquals(data, expected_data)
935
936 def test_update_zone(self):
937 name, payload, zone = self.create_zone()
938 name = payload['name']
939 # update, set as Master and enable SOA-EDIT-API
940 payload = {
941 'kind': 'Master',
942 'masters': ['192.0.2.1', '192.0.2.2'],
943 'soa_edit_api': 'EPOCH',
944 'soa_edit': 'EPOCH'
945 }
946 r = self.session.put(
947 self.url("/api/v1/servers/localhost/zones/" + name),
948 data=json.dumps(payload),
949 headers={'content-type': 'application/json'})
950 self.assert_success(r)
951 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
952 for k in payload.keys():
953 self.assertIn(k, data)
954 self.assertEquals(data[k], payload[k])
955 # update, back to Native and empty(off)
956 payload = {
957 'kind': 'Native',
958 'soa_edit_api': '',
959 'soa_edit': ''
960 }
961 r = self.session.put(
962 self.url("/api/v1/servers/localhost/zones/" + name),
963 data=json.dumps(payload),
964 headers={'content-type': 'application/json'})
965 self.assert_success(r)
966 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
967 for k in payload.keys():
968 self.assertIn(k, data)
969 self.assertEquals(data[k], payload[k])
970
971 def test_zone_rr_update(self):
972 name, payload, zone = self.create_zone()
973 # do a replace (= update)
974 rrset = {
975 'changetype': 'replace',
976 'name': name,
977 'type': 'ns',
978 'ttl': 3600,
979 'records': [
980 {
981 "content": "ns1.bar.com.",
982 "disabled": False
983 },
984 {
985 "content": "ns2-disabled.bar.com.",
986 "disabled": True
987 }
988 ]
989 }
990 payload = {'rrsets': [rrset]}
991 r = self.session.patch(
992 self.url("/api/v1/servers/localhost/zones/" + name),
993 data=json.dumps(payload),
994 headers={'content-type': 'application/json'})
995 self.assert_success(r)
996 # verify that (only) the new record is there
997 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
998 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset['records'])
999
1000 def test_zone_rr_update_mx(self):
1001 # Important to test with MX records, as they have a priority field, which must end up in the content field.
1002 name, payload, zone = self.create_zone()
1003 # do a replace (= update)
1004 rrset = {
1005 'changetype': 'replace',
1006 'name': name,
1007 'type': 'MX',
1008 'ttl': 3600,
1009 'records': [
1010 {
1011 "content": "10 mail.example.org.",
1012 "disabled": False
1013 }
1014 ]
1015 }
1016 payload = {'rrsets': [rrset]}
1017 r = self.session.patch(
1018 self.url("/api/v1/servers/localhost/zones/" + name),
1019 data=json.dumps(payload),
1020 headers={'content-type': 'application/json'})
1021 self.assert_success(r)
1022 # verify that (only) the new record is there
1023 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1024 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset['records'])
1025
1026 def test_zone_rr_update_invalid_mx(self):
1027 name, payload, zone = self.create_zone()
1028 # do a replace (= update)
1029 rrset = {
1030 'changetype': 'replace',
1031 'name': name,
1032 'type': 'MX',
1033 'ttl': 3600,
1034 'records': [
1035 {
1036 "content": "10 mail@mx.example.org.",
1037 "disabled": False
1038 }
1039 ]
1040 }
1041 payload = {'rrsets': [rrset]}
1042 r = self.session.patch(
1043 self.url("/api/v1/servers/localhost/zones/" + name),
1044 data=json.dumps(payload),
1045 headers={'content-type': 'application/json'})
1046 self.assertEquals(r.status_code, 422)
1047 self.assertIn('non-hostname content', r.json()['error'])
1048 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1049 self.assertIsNone(get_rrset(data, name, 'MX'))
1050
1051 def test_zone_rr_update_opt(self):
1052 name, payload, zone = self.create_zone()
1053 # do a replace (= update)
1054 rrset = {
1055 'changetype': 'replace',
1056 'name': name,
1057 'type': 'OPT',
1058 'ttl': 3600,
1059 'records': [
1060 {
1061 "content": "9",
1062 "disabled": False
1063 }
1064 ]
1065 }
1066 payload = {'rrsets': [rrset]}
1067 r = self.session.patch(
1068 self.url("/api/v1/servers/localhost/zones/" + name),
1069 data=json.dumps(payload),
1070 headers={'content-type': 'application/json'})
1071 self.assertEquals(r.status_code, 422)
1072 self.assertIn('OPT: invalid type given', r.json()['error'])
1073
1074 def test_zone_rr_update_multiple_rrsets(self):
1075 name, payload, zone = self.create_zone()
1076 rrset1 = {
1077 'changetype': 'replace',
1078 'name': name,
1079 'type': 'NS',
1080 'ttl': 3600,
1081 'records': [
1082 {
1083
1084 "content": "ns9999.example.com.",
1085 "disabled": False
1086 }
1087 ]
1088 }
1089 rrset2 = {
1090 'changetype': 'replace',
1091 'name': name,
1092 'type': 'MX',
1093 'ttl': 3600,
1094 'records': [
1095 {
1096 "content": "10 mx444.example.com.",
1097 "disabled": False
1098 }
1099 ]
1100 }
1101 payload = {'rrsets': [rrset1, rrset2]}
1102 r = self.session.patch(
1103 self.url("/api/v1/servers/localhost/zones/" + name),
1104 data=json.dumps(payload),
1105 headers={'content-type': 'application/json'})
1106 self.assert_success(r)
1107 # verify that all rrsets have been updated
1108 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1109 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset1['records'])
1110 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset2['records'])
1111
1112 def test_zone_rr_update_duplicate_record(self):
1113 name, payload, zone = self.create_zone()
1114 rrset = {
1115 'changetype': 'replace',
1116 'name': name,
1117 'type': 'NS',
1118 'ttl': 3600,
1119 'records': [
1120 {"content": "ns9999.example.com.", "disabled": False},
1121 {"content": "ns9996.example.com.", "disabled": False},
1122 {"content": "ns9987.example.com.", "disabled": False},
1123 {"content": "ns9988.example.com.", "disabled": False},
1124 {"content": "ns9999.example.com.", "disabled": False},
1125 ]
1126 }
1127 payload = {'rrsets': [rrset]}
1128 r = self.session.patch(
1129 self.url("/api/v1/servers/localhost/zones/" + name),
1130 data=json.dumps(payload),
1131 headers={'content-type': 'application/json'})
1132 self.assertEquals(r.status_code, 422)
1133 self.assertIn('Duplicate record in RRset', r.json()['error'])
1134
1135 def test_zone_rr_update_duplicate_rrset(self):
1136 name, payload, zone = self.create_zone()
1137 rrset1 = {
1138 'changetype': 'replace',
1139 'name': name,
1140 'type': 'NS',
1141 'ttl': 3600,
1142 'records': [
1143 {
1144 "content": "ns9999.example.com.",
1145 "disabled": False
1146 }
1147 ]
1148 }
1149 rrset2 = {
1150 'changetype': 'replace',
1151 'name': name,
1152 'type': 'NS',
1153 'ttl': 3600,
1154 'records': [
1155 {
1156 "content": "ns9998.example.com.",
1157 "disabled": False
1158 }
1159 ]
1160 }
1161 payload = {'rrsets': [rrset1, rrset2]}
1162 r = self.session.patch(
1163 self.url("/api/v1/servers/localhost/zones/" + name),
1164 data=json.dumps(payload),
1165 headers={'content-type': 'application/json'})
1166 self.assertEquals(r.status_code, 422)
1167 self.assertIn('Duplicate RRset', r.json()['error'])
1168
1169 def test_zone_rr_delete(self):
1170 name, payload, zone = self.create_zone()
1171 # do a delete of all NS records (these are created with the zone)
1172 rrset = {
1173 'changetype': 'delete',
1174 'name': name,
1175 'type': 'NS'
1176 }
1177 payload = {'rrsets': [rrset]}
1178 r = self.session.patch(
1179 self.url("/api/v1/servers/localhost/zones/" + name),
1180 data=json.dumps(payload),
1181 headers={'content-type': 'application/json'})
1182 self.assert_success(r)
1183 # verify that the records are gone
1184 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1185 self.assertIsNone(get_rrset(data, name, 'NS'))
1186
1187 def test_zone_disable_reenable(self):
1188 # This also tests that SOA-EDIT-API works.
1189 name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
1190 # disable zone by disabling SOA
1191 rrset = {
1192 'changetype': 'replace',
1193 'name': name,
1194 'type': 'SOA',
1195 'ttl': 3600,
1196 'records': [
1197 {
1198 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1199 "disabled": True
1200 }
1201 ]
1202 }
1203 payload = {'rrsets': [rrset]}
1204 r = self.session.patch(
1205 self.url("/api/v1/servers/localhost/zones/" + name),
1206 data=json.dumps(payload),
1207 headers={'content-type': 'application/json'})
1208 self.assert_success(r)
1209 # check SOA serial has been edited
1210 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1211 soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1212 self.assertNotEquals(soa_serial1, '1')
1213 # make sure domain is still in zone list (disabled SOA!)
1214 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
1215 domains = r.json()
1216 self.assertEquals(len([domain for domain in domains if domain['name'] == name]), 1)
1217 # sleep 1sec to ensure the EPOCH value changes for the next request
1218 time.sleep(1)
1219 # verify that modifying it still works
1220 rrset['records'][0]['disabled'] = False
1221 payload = {'rrsets': [rrset]}
1222 r = self.session.patch(
1223 self.url("/api/v1/servers/localhost/zones/" + name),
1224 data=json.dumps(payload),
1225 headers={'content-type': 'application/json'})
1226 self.assert_success(r)
1227 # check SOA serial has been edited again
1228 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1229 soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1230 self.assertNotEquals(soa_serial2, '1')
1231 self.assertNotEquals(soa_serial2, soa_serial1)
1232
1233 def test_zone_rr_update_out_of_zone(self):
1234 name, payload, zone = self.create_zone()
1235 # replace with qname mismatch
1236 rrset = {
1237 'changetype': 'replace',
1238 'name': 'not-in-zone.',
1239 'type': 'NS',
1240 'ttl': 3600,
1241 'records': [
1242 {
1243 "content": "ns1.bar.com.",
1244 "disabled": False
1245 }
1246 ]
1247 }
1248 payload = {'rrsets': [rrset]}
1249 r = self.session.patch(
1250 self.url("/api/v1/servers/localhost/zones/" + name),
1251 data=json.dumps(payload),
1252 headers={'content-type': 'application/json'})
1253 self.assertEquals(r.status_code, 422)
1254 self.assertIn('out of zone', r.json()['error'])
1255
1256 def test_zone_rr_update_restricted_chars(self):
1257 name, payload, zone = self.create_zone()
1258 # replace with qname mismatch
1259 rrset = {
1260 'changetype': 'replace',
1261 'name': 'test:' + name,
1262 'type': 'NS',
1263 'ttl': 3600,
1264 'records': [
1265 {
1266 "content": "ns1.bar.com.",
1267 "disabled": False
1268 }
1269 ]
1270 }
1271 payload = {'rrsets': [rrset]}
1272 r = self.session.patch(
1273 self.url("/api/v1/servers/localhost/zones/" + name),
1274 data=json.dumps(payload),
1275 headers={'content-type': 'application/json'})
1276 self.assertEquals(r.status_code, 422)
1277 self.assertIn('contains unsupported characters', r.json()['error'])
1278
1279 def test_rrset_unknown_type(self):
1280 name, payload, zone = self.create_zone()
1281 rrset = {
1282 'changetype': 'replace',
1283 'name': name,
1284 'type': 'FAFAFA',
1285 'ttl': 3600,
1286 'records': [
1287 {
1288 "content": "4.3.2.1",
1289 "disabled": False
1290 }
1291 ]
1292 }
1293 payload = {'rrsets': [rrset]}
1294 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1295 headers={'content-type': 'application/json'})
1296 self.assertEquals(r.status_code, 422)
1297 self.assertIn('unknown type', r.json()['error'])
1298
1299 @parameterized.expand([
1300 ('CNAME', ),
1301 ('DNAME', ),
1302 ])
1303 def test_rrset_exclusive_and_other(self, qtype):
1304 name, payload, zone = self.create_zone()
1305 rrset = {
1306 'changetype': 'replace',
1307 'name': name,
1308 'type': qtype,
1309 'ttl': 3600,
1310 'records': [
1311 {
1312 "content": "example.org.",
1313 "disabled": False
1314 }
1315 ]
1316 }
1317 payload = {'rrsets': [rrset]}
1318 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1319 headers={'content-type': 'application/json'})
1320 self.assertEquals(r.status_code, 422)
1321 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1322
1323 @parameterized.expand([
1324 ('CNAME', ),
1325 ('DNAME', ),
1326 ])
1327 def test_rrset_other_and_exclusive(self, qtype):
1328 name, payload, zone = self.create_zone()
1329 rrset = {
1330 'changetype': 'replace',
1331 'name': 'sub.'+name,
1332 'type': qtype,
1333 'ttl': 3600,
1334 'records': [
1335 {
1336 "content": "example.org.",
1337 "disabled": False
1338 }
1339 ]
1340 }
1341 payload = {'rrsets': [rrset]}
1342 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1343 headers={'content-type': 'application/json'})
1344 self.assert_success(r)
1345 rrset = {
1346 'changetype': 'replace',
1347 'name': 'sub.'+name,
1348 'type': 'A',
1349 'ttl': 3600,
1350 'records': [
1351 {
1352 "content": "1.2.3.4",
1353 "disabled": False
1354 }
1355 ]
1356 }
1357 payload = {'rrsets': [rrset]}
1358 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1359 headers={'content-type': 'application/json'})
1360 self.assertEquals(r.status_code, 422)
1361 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1362
1363 @parameterized.expand([
1364 ('SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
1365 ('CNAME', ['01.example.org.', '02.example.org.']),
1366 ('DNAME', ['01.example.org.', '02.example.org.']),
1367 ])
1368 def test_rrset_single_qtypes(self, qtype, contents):
1369 name, payload, zone = self.create_zone()
1370 rrset = {
1371 'changetype': 'replace',
1372 'name': 'sub.'+name,
1373 'type': qtype,
1374 'ttl': 3600,
1375 'records': [
1376 {
1377 "content": contents[0],
1378 "disabled": False
1379 },
1380 {
1381 "content": contents[1],
1382 "disabled": False
1383 }
1384 ]
1385 }
1386 payload = {'rrsets': [rrset]}
1387 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1388 headers={'content-type': 'application/json'})
1389 self.assertEquals(r.status_code, 422)
1390 self.assertIn('IN ' + qtype + ' has more than one record', r.json()['error'])
1391
1392 def test_create_zone_with_leading_space(self):
1393 # Actual regression.
1394 name, payload, zone = self.create_zone()
1395 rrset = {
1396 'changetype': 'replace',
1397 'name': name,
1398 'type': 'A',
1399 'ttl': 3600,
1400 'records': [
1401 {
1402 "content": " 4.3.2.1",
1403 "disabled": False
1404 }
1405 ]
1406 }
1407 payload = {'rrsets': [rrset]}
1408 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1409 headers={'content-type': 'application/json'})
1410 self.assertEquals(r.status_code, 422)
1411 self.assertIn('Not in expected format', r.json()['error'])
1412
1413 def test_zone_rr_delete_out_of_zone(self):
1414 name, payload, zone = self.create_zone()
1415 rrset = {
1416 'changetype': 'delete',
1417 'name': 'not-in-zone.',
1418 'type': 'NS'
1419 }
1420 payload = {'rrsets': [rrset]}
1421 r = self.session.patch(
1422 self.url("/api/v1/servers/localhost/zones/" + name),
1423 data=json.dumps(payload),
1424 headers={'content-type': 'application/json'})
1425 print(r.content)
1426 self.assert_success(r) # succeed so users can fix their wrong, old data
1427
1428 def test_zone_delete(self):
1429 name, payload, zone = self.create_zone()
1430 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1431 self.assertEquals(r.status_code, 204)
1432 self.assertNotIn('Content-Type', r.headers)
1433
1434 def test_zone_comment_create(self):
1435 name, payload, zone = self.create_zone()
1436 rrset = {
1437 'changetype': 'replace',
1438 'name': name,
1439 'type': 'NS',
1440 'ttl': 3600,
1441 'comments': [
1442 {
1443 'account': 'test1',
1444 'content': 'blah blah',
1445 },
1446 {
1447 'account': 'test2',
1448 'content': 'blah blah bleh',
1449 }
1450 ]
1451 }
1452 payload = {'rrsets': [rrset]}
1453 r = self.session.patch(
1454 self.url("/api/v1/servers/localhost/zones/" + name),
1455 data=json.dumps(payload),
1456 headers={'content-type': 'application/json'})
1457 self.assert_success(r)
1458 # make sure the comments have been set, and that the NS
1459 # records are still present
1460 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1461 serverset = get_rrset(data, name, 'NS')
1462 print(serverset)
1463 self.assertNotEquals(serverset['records'], [])
1464 self.assertNotEquals(serverset['comments'], [])
1465 # verify that modified_at has been set by pdns
1466 self.assertNotEquals([c for c in serverset['comments']][0]['modified_at'], 0)
1467 # verify that TTL is correct (regression test)
1468 self.assertEquals(serverset['ttl'], 3600)
1469
1470 def test_zone_comment_delete(self):
1471 # Test: Delete ONLY comments.
1472 name, payload, zone = self.create_zone()
1473 rrset = {
1474 'changetype': 'replace',
1475 'name': name,
1476 'type': 'NS',
1477 'comments': []
1478 }
1479 payload = {'rrsets': [rrset]}
1480 r = self.session.patch(
1481 self.url("/api/v1/servers/localhost/zones/" + name),
1482 data=json.dumps(payload),
1483 headers={'content-type': 'application/json'})
1484 self.assert_success(r)
1485 # make sure the NS records are still present
1486 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1487 serverset = get_rrset(data, name, 'NS')
1488 print(serverset)
1489 self.assertNotEquals(serverset['records'], [])
1490 self.assertEquals(serverset['comments'], [])
1491
1492 def test_zone_comment_out_of_range_modified_at(self):
1493 # Test if comments on an rrset stay intact if the rrset is replaced
1494 name, payload, zone = self.create_zone()
1495 rrset = {
1496 'changetype': 'replace',
1497 'name': name,
1498 'type': 'NS',
1499 'comments': [
1500 {
1501 'account': 'test1',
1502 'content': 'oh hi there',
1503 'modified_at': '4294967297'
1504 }
1505 ]
1506 }
1507 payload = {'rrsets': [rrset]}
1508 r = self.session.patch(
1509 self.url("/api/v1/servers/localhost/zones/" + name),
1510 data=json.dumps(payload),
1511 headers={'content-type': 'application/json'})
1512 self.assertEquals(r.status_code, 422)
1513 self.assertIn("Value for key 'modified_at' is out of range", r.json()['error'])
1514
1515 def test_zone_comment_stay_intact(self):
1516 # Test if comments on an rrset stay intact if the rrset is replaced
1517 name, payload, zone = self.create_zone()
1518 # create a comment
1519 rrset = {
1520 'changetype': 'replace',
1521 'name': name,
1522 'type': 'NS',
1523 'comments': [
1524 {
1525 'account': 'test1',
1526 'content': 'oh hi there',
1527 'modified_at': 1111
1528 }
1529 ]
1530 }
1531 payload = {'rrsets': [rrset]}
1532 r = self.session.patch(
1533 self.url("/api/v1/servers/localhost/zones/" + name),
1534 data=json.dumps(payload),
1535 headers={'content-type': 'application/json'})
1536 self.assert_success(r)
1537 # replace rrset records
1538 rrset2 = {
1539 'changetype': 'replace',
1540 'name': name,
1541 'type': 'NS',
1542 'ttl': 3600,
1543 'records': [
1544 {
1545 "content": "ns1.bar.com.",
1546 "disabled": False
1547 }
1548 ]
1549 }
1550 payload2 = {'rrsets': [rrset2]}
1551 r = self.session.patch(
1552 self.url("/api/v1/servers/localhost/zones/" + name),
1553 data=json.dumps(payload2),
1554 headers={'content-type': 'application/json'})
1555 self.assert_success(r)
1556 # make sure the comments still exist
1557 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1558 serverset = get_rrset(data, name, 'NS')
1559 print(serverset)
1560 self.assertEquals(serverset['records'], rrset2['records'])
1561 self.assertEquals(serverset['comments'], rrset['comments'])
1562
1563 def test_zone_auto_ptr_ipv4_create(self):
1564 revzone = '4.2.192.in-addr.arpa.'
1565 _, _, revzonedata = self.create_zone(name=revzone)
1566 name = unique_zone_name()
1567 rrset = {
1568 "name": name,
1569 "type": "A",
1570 "ttl": 3600,
1571 "records": [{
1572 "content": "192.2.4.44",
1573 "disabled": False,
1574 "set-ptr": True,
1575 }],
1576 }
1577 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
1578 del rrset['records'][0]['set-ptr']
1579 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
1580 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1581 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1582 print(revsets)
1583 self.assertEquals(revsets, [{
1584 u'name': u'44.4.2.192.in-addr.arpa.',
1585 u'ttl': 3600,
1586 u'type': u'PTR',
1587 u'comments': [],
1588 u'records': [{
1589 u'content': name,
1590 u'disabled': False,
1591 }],
1592 }])
1593 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1594 self.assertGreater(r['serial'], revzonedata['serial'])
1595
1596 def test_zone_auto_ptr_ipv4_update(self):
1597 revzone = '0.2.192.in-addr.arpa.'
1598 _, _, revzonedata = self.create_zone(name=revzone)
1599 name, payload, zone = self.create_zone()
1600 rrset = {
1601 'changetype': 'replace',
1602 'name': name,
1603 'type': 'A',
1604 'ttl': 3600,
1605 'records': [
1606 {
1607 "content": '192.2.0.2',
1608 "disabled": False,
1609 "set-ptr": True
1610 }
1611 ]
1612 }
1613 payload = {'rrsets': [rrset]}
1614 r = self.session.patch(
1615 self.url("/api/v1/servers/localhost/zones/" + name),
1616 data=json.dumps(payload),
1617 headers={'content-type': 'application/json'})
1618 self.assert_success(r)
1619 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1620 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1621 print(revsets)
1622 self.assertEquals(revsets, [{
1623 u'name': u'2.0.2.192.in-addr.arpa.',
1624 u'ttl': 3600,
1625 u'type': u'PTR',
1626 u'comments': [],
1627 u'records': [{
1628 u'content': name,
1629 u'disabled': False,
1630 }],
1631 }])
1632 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1633 self.assertGreater(r['serial'], revzonedata['serial'])
1634
1635 def test_zone_auto_ptr_ipv6_update(self):
1636 # 2001:DB8::bb:aa
1637 revzone = '8.b.d.0.1.0.0.2.ip6.arpa.'
1638 _, _, revzonedata = self.create_zone(name=revzone)
1639 name, payload, zone = self.create_zone()
1640 rrset = {
1641 'changetype': 'replace',
1642 'name': name,
1643 'type': 'AAAA',
1644 'ttl': 3600,
1645 'records': [
1646 {
1647 "content": '2001:DB8::bb:aa',
1648 "disabled": False,
1649 "set-ptr": True
1650 }
1651 ]
1652 }
1653 payload = {'rrsets': [rrset]}
1654 r = self.session.patch(
1655 self.url("/api/v1/servers/localhost/zones/" + name),
1656 data=json.dumps(payload),
1657 headers={'content-type': 'application/json'})
1658 self.assert_success(r)
1659 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1660 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1661 print(revsets)
1662 self.assertEquals(revsets, [{
1663 u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1664 u'ttl': 3600,
1665 u'type': u'PTR',
1666 u'comments': [],
1667 u'records': [{
1668 u'content': name,
1669 u'disabled': False,
1670 }],
1671 }])
1672 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1673 self.assertGreater(r['serial'], revzonedata['serial'])
1674
1675 def test_search_rr_exact_zone(self):
1676 name = unique_zone_name()
1677 self.create_zone(name=name, serial=22, soa_edit_api='')
1678 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
1679 self.assert_success_json(r)
1680 print(r.json())
1681 self.assertEquals(r.json(), [
1682 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1683 {u'content': u'ns1.example.com.',
1684 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1685 u'ttl': 3600, u'type': u'NS', u'name': name},
1686 {u'content': u'ns2.example.com.',
1687 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1688 u'ttl': 3600, u'type': u'NS', u'name': name},
1689 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1690 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1691 u'ttl': 3600, u'type': u'SOA', u'name': name},
1692 ])
1693
1694 def test_search_rr_exact_zone_filter_type_zone(self):
1695 name = unique_zone_name()
1696 data_type = "zone"
1697 self.create_zone(name=name, serial=22, soa_edit_api='')
1698 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
1699 self.assert_success_json(r)
1700 print(r.json())
1701 self.assertEquals(r.json(), [
1702 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1703 ])
1704
1705 def test_search_rr_exact_zone_filter_type_record(self):
1706 name = unique_zone_name()
1707 data_type = "record"
1708 self.create_zone(name=name, serial=22, soa_edit_api='')
1709 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
1710 self.assert_success_json(r)
1711 print(r.json())
1712 self.assertEquals(r.json(), [
1713 {u'content': u'ns1.example.com.',
1714 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1715 u'ttl': 3600, u'type': u'NS', u'name': name},
1716 {u'content': u'ns2.example.com.',
1717 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1718 u'ttl': 3600, u'type': u'NS', u'name': name},
1719 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1720 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1721 u'ttl': 3600, u'type': u'SOA', u'name': name},
1722 ])
1723
1724 def test_search_rr_substring(self):
1725 name = unique_zone_name()
1726 search = name[5:-5]
1727 self.create_zone(name=name)
1728 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1729 self.assert_success_json(r)
1730 print(r.json())
1731 # should return zone, SOA, ns1, ns2
1732 self.assertEquals(len(r.json()), 4)
1733
1734 def test_search_rr_case_insensitive(self):
1735 name = unique_zone_name()+'testsuffix.'
1736 self.create_zone(name=name)
1737 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1738 self.assert_success_json(r)
1739 print(r.json())
1740 # should return zone, SOA, ns1, ns2
1741 self.assertEquals(len(r.json()), 4)
1742
1743 def test_search_after_rectify_with_ent(self):
1744 name = unique_zone_name()
1745 search = name.split('.')[0]
1746 rrset = {
1747 "name": 'sub.sub.' + name,
1748 "type": "A",
1749 "ttl": 3600,
1750 "records": [{
1751 "content": "4.3.2.1",
1752 "disabled": False,
1753 }],
1754 }
1755 self.create_zone(name=name, rrsets=[rrset])
1756 pdnsutil_rectify(name)
1757 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1758 self.assert_success_json(r)
1759 print(r.json())
1760 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1761 self.assertEquals(len(r.json()), 5)
1762
1763 def test_default_api_rectify(self):
1764 name = unique_zone_name()
1765 search = name.split('.')[0]
1766 rrsets = [
1767 {
1768 "name": 'a.' + name,
1769 "type": "AAAA",
1770 "ttl": 3600,
1771 "records": [{
1772 "content": "2001:DB8::1",
1773 "disabled": False,
1774 }],
1775 },
1776 {
1777 "name": 'b.' + name,
1778 "type": "AAAA",
1779 "ttl": 3600,
1780 "records": [{
1781 "content": "2001:DB8::2",
1782 "disabled": False,
1783 }],
1784 },
1785 ]
1786 self.create_zone(name=name, rrsets=rrsets, dnssec=True, nsec3param='1 0 1 ab')
1787 dbrecs = get_db_records(name, 'AAAA')
1788 self.assertIsNotNone(dbrecs[0]['ordername'])
1789
1790 def test_override_api_rectify(self):
1791 name = unique_zone_name()
1792 search = name.split('.')[0]
1793 rrsets = [
1794 {
1795 "name": 'a.' + name,
1796 "type": "AAAA",
1797 "ttl": 3600,
1798 "records": [{
1799 "content": "2001:DB8::1",
1800 "disabled": False,
1801 }],
1802 },
1803 {
1804 "name": 'b.' + name,
1805 "type": "AAAA",
1806 "ttl": 3600,
1807 "records": [{
1808 "content": "2001:DB8::2",
1809 "disabled": False,
1810 }],
1811 },
1812 ]
1813 self.create_zone(name=name, rrsets=rrsets, api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
1814 dbrecs = get_db_records(name, 'AAAA')
1815 self.assertIsNone(dbrecs[0]['ordername'])
1816
1817 def test_cname_at_ent_place(self):
1818 name, payload, zone = self.create_zone(dnssec=True, api_rectify=True)
1819 rrset = {
1820 'changetype': 'replace',
1821 'name': 'sub2.sub1.' + name,
1822 'type': "A",
1823 'ttl': 3600,
1824 'records': [{
1825 'content': "4.3.2.1",
1826 'disabled': False,
1827 }],
1828 }
1829 payload = {'rrsets': [rrset]}
1830 r = self.session.patch(
1831 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1832 data=json.dumps(payload),
1833 headers={'content-type': 'application/json'})
1834 self.assertEquals(r.status_code, 204)
1835 rrset = {
1836 'changetype': 'replace',
1837 'name': 'sub1.' + name,
1838 'type': "CNAME",
1839 'ttl': 3600,
1840 'records': [{
1841 'content': "www.example.org.",
1842 'disabled': False,
1843 }],
1844 }
1845 payload = {'rrsets': [rrset]}
1846 r = self.session.patch(
1847 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1848 data=json.dumps(payload),
1849 headers={'content-type': 'application/json'})
1850 self.assertEquals(r.status_code, 204)
1851
1852 def test_rrset_parameter_post_false(self):
1853 name = unique_zone_name()
1854 payload = {
1855 'name': name,
1856 'kind': 'Native',
1857 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1858 }
1859 r = self.session.post(
1860 self.url("/api/v1/servers/localhost/zones?rrsets=false"),
1861 data=json.dumps(payload),
1862 headers={'content-type': 'application/json'})
1863 print(r.json())
1864 self.assert_success_json(r)
1865 self.assertEquals(r.status_code, 201)
1866 self.assertEquals(r.json().get('rrsets'), None)
1867
1868 def test_rrset_false_parameter(self):
1869 name = unique_zone_name()
1870 self.create_zone(name=name, kind='Native')
1871 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
1872 self.assert_success_json(r)
1873 print(r.json())
1874 self.assertEquals(r.json().get('rrsets'), None)
1875
1876 def test_rrset_true_parameter(self):
1877 name = unique_zone_name()
1878 self.create_zone(name=name, kind='Native')
1879 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
1880 self.assert_success_json(r)
1881 print(r.json())
1882 self.assertEquals(len(r.json().get('rrsets')), 2)
1883
1884 def test_wrong_rrset_parameter(self):
1885 name = unique_zone_name()
1886 self.create_zone(name=name, kind='Native')
1887 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar"))
1888 self.assertEquals(r.status_code, 422)
1889 self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error'])
1890
1891 def test_put_master_tsig_key_ids_non_existent(self):
1892 name = unique_zone_name()
1893 keyname = unique_zone_name().split('.')[0]
1894 self.create_zone(name=name, kind='Native')
1895 payload = {
1896 'master_tsig_key_ids': [keyname]
1897 }
1898 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
1899 data=json.dumps(payload),
1900 headers={'content-type': 'application/json'})
1901 self.assertEquals(r.status_code, 422)
1902 self.assertIn('A TSIG key with the name', r.json()['error'])
1903
1904 def test_put_slave_tsig_key_ids_non_existent(self):
1905 name = unique_zone_name()
1906 keyname = unique_zone_name().split('.')[0]
1907 self.create_zone(name=name, kind='Native')
1908 payload = {
1909 'slave_tsig_key_ids': [keyname]
1910 }
1911 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
1912 data=json.dumps(payload),
1913 headers={'content-type': 'application/json'})
1914 self.assertEquals(r.status_code, 422)
1915 self.assertIn('A TSIG key with the name', r.json()['error'])
1916
1917
1918 @unittest.skipIf(not is_auth(), "Not applicable")
1919 class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
1920
1921 def setUp(self):
1922 super(AuthRootZone, self).setUp()
1923 # zone name is not unique, so delete the zone before each individual test.
1924 self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
1925
1926 def test_create_zone(self):
1927 name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
1928 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
1929 self.assertIn(k, data)
1930 if k in payload:
1931 self.assertEquals(data[k], payload[k])
1932 # validate generated SOA
1933 rec = get_first_rec(data, '.', 'SOA')
1934 self.assertEquals(
1935 rec['content'],
1936 "a.misconfigured.powerdns.server. hostmaster. " + str(payload['serial']) +
1937 " 10800 3600 604800 3600"
1938 )
1939 # Regression test: verify zone list works
1940 zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
1941 print("zonelist:", zonelist)
1942 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
1943 # Also test that fetching the zone works.
1944 print("id:", data['id'])
1945 self.assertEquals(data['id'], '=2E')
1946 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
1947 print("zone (fetched):", data)
1948 for k in ('name', 'kind'):
1949 self.assertIn(k, data)
1950 self.assertEquals(data[k], payload[k])
1951 self.assertEqual(data['rrsets'][0]['name'], '.')
1952
1953 def test_update_zone(self):
1954 name, payload, zone = self.create_zone(name='.')
1955 zone_id = '=2E'
1956 # update, set as Master and enable SOA-EDIT-API
1957 payload = {
1958 'kind': 'Master',
1959 'masters': ['192.0.2.1', '192.0.2.2'],
1960 'soa_edit_api': 'EPOCH',
1961 'soa_edit': 'EPOCH'
1962 }
1963 r = self.session.put(
1964 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1965 data=json.dumps(payload),
1966 headers={'content-type': 'application/json'})
1967 self.assert_success(r)
1968 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1969 for k in payload.keys():
1970 self.assertIn(k, data)
1971 self.assertEquals(data[k], payload[k])
1972 # update, back to Native and empty(off)
1973 payload = {
1974 'kind': 'Native',
1975 'soa_edit_api': '',
1976 'soa_edit': ''
1977 }
1978 r = self.session.put(
1979 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1980 data=json.dumps(payload),
1981 headers={'content-type': 'application/json'})
1982 self.assert_success(r)
1983 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1984 for k in payload.keys():
1985 self.assertIn(k, data)
1986 self.assertEquals(data[k], payload[k])
1987
1988
1989 @unittest.skipIf(not is_recursor(), "Not applicable")
1990 class RecursorZones(ApiTestCase):
1991
1992 def create_zone(self, name=None, kind=None, rd=False, servers=None):
1993 if name is None:
1994 name = unique_zone_name()
1995 if servers is None:
1996 servers = []
1997 payload = {
1998 'name': name,
1999 'kind': kind,
2000 'servers': servers,
2001 'recursion_desired': rd
2002 }
2003 r = self.session.post(
2004 self.url("/api/v1/servers/localhost/zones"),
2005 data=json.dumps(payload),
2006 headers={'content-type': 'application/json'})
2007 self.assert_success_json(r)
2008 return payload, r.json()
2009
2010 def test_create_auth_zone(self):
2011 payload, data = self.create_zone(kind='Native')
2012 for k in payload.keys():
2013 self.assertEquals(data[k], payload[k])
2014
2015 def test_create_zone_no_name(self):
2016 payload = {
2017 'name': '',
2018 'kind': 'Native',
2019 'servers': ['8.8.8.8'],
2020 'recursion_desired': False,
2021 }
2022 print(payload)
2023 r = self.session.post(
2024 self.url("/api/v1/servers/localhost/zones"),
2025 data=json.dumps(payload),
2026 headers={'content-type': 'application/json'})
2027 self.assertEquals(r.status_code, 422)
2028 self.assertIn('is not canonical', r.json()['error'])
2029
2030 def test_create_forwarded_zone(self):
2031 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
2032 # return values are normalized
2033 payload['servers'][0] += ':53'
2034 for k in payload.keys():
2035 self.assertEquals(data[k], payload[k])
2036
2037 def test_create_forwarded_rd_zone(self):
2038 payload, data = self.create_zone(name='google.com.', kind='Forwarded', rd=True, servers=['8.8.8.8'])
2039 # return values are normalized
2040 payload['servers'][0] += ':53'
2041 for k in payload.keys():
2042 self.assertEquals(data[k], payload[k])
2043
2044 def test_create_auth_zone_with_symbols(self):
2045 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
2046 expected_id = (payload['name'].replace('/', '=2F'))
2047 for k in payload.keys():
2048 self.assertEquals(data[k], payload[k])
2049 self.assertEquals(data['id'], expected_id)
2050
2051 def test_rename_auth_zone(self):
2052 payload, data = self.create_zone(kind='Native')
2053 name = payload['name']
2054 # now rename it
2055 payload = {
2056 'name': 'renamed-'+name,
2057 'kind': 'Native',
2058 'recursion_desired': False
2059 }
2060 r = self.session.put(
2061 self.url("/api/v1/servers/localhost/zones/" + name),
2062 data=json.dumps(payload),
2063 headers={'content-type': 'application/json'})
2064 self.assert_success(r)
2065 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + payload['name'])).json()
2066 for k in payload.keys():
2067 self.assertEquals(data[k], payload[k])
2068
2069 def test_zone_delete(self):
2070 payload, zone = self.create_zone(kind='Native')
2071 name = payload['name']
2072 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
2073 self.assertEquals(r.status_code, 204)
2074 self.assertNotIn('Content-Type', r.headers)
2075
2076 def test_search_rr_exact_zone(self):
2077 name = unique_zone_name()
2078 self.create_zone(name=name, kind='Native')
2079 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
2080 self.assert_success_json(r)
2081 print(r.json())
2082 self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
2083
2084 def test_search_rr_substring(self):
2085 name = 'search-rr-zone.name.'
2086 self.create_zone(name=name, kind='Native')
2087 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2088 self.assert_success_json(r)
2089 print(r.json())
2090 # should return zone, SOA
2091 self.assertEquals(len(r.json()), 2)
2092
2093 @unittest.skipIf(not is_auth(), "Not applicable")
2094 class AuthZoneKeys(ApiTestCase, AuthZonesHelperMixin):
2095
2096 def test_get_keys(self):
2097 r = self.session.get(
2098 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2099 self.assert_success_json(r)
2100 keys = r.json()
2101 self.assertGreater(len(keys), 0)
2102
2103 key0 = deepcopy(keys[0])
2104 del key0['dnskey']
2105 del key0['ds']
2106 expected = {
2107 u'algorithm': u'ECDSAP256SHA256',
2108 u'bits': 256,
2109 u'active': True,
2110 u'type': u'Cryptokey',
2111 u'keytype': u'csk',
2112 u'flags': 257,
2113 u'id': 1}
2114 self.assertEquals(key0, expected)
2115
2116 keydata = keys[0]['dnskey'].split()
2117 self.assertEqual(len(keydata), 4)