]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
rec: Add a regression test for the RPZ updates with several deltas
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
1 from __future__ import print_function
2 import json
3 import time
4 import unittest
5 from copy import deepcopy
6 from parameterized import parameterized
7 from pprint import pprint
8 from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, get_db_records, pdnsutil_rectify
9
10
11 def get_rrset(data, qname, qtype):
12 for rrset in data['rrsets']:
13 if rrset['name'] == qname and rrset['type'] == qtype:
14 return rrset
15 return None
16
17
18 def get_first_rec(data, qname, qtype):
19 rrset = get_rrset(data, qname, qtype)
20 if rrset:
21 return rrset['records'][0]
22 return None
23
24
25 def eq_zone_rrsets(rrsets, expected):
26 data_got = {}
27 data_expected = {}
28 for type_, expected_records in expected.items():
29 type_ = str(type_)
30 data_got[type_] = set()
31 data_expected[type_] = set()
32 uses_name = any(['name' in expected_record for expected_record in expected_records])
33 # minify + convert received data
34 for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
35 print(rrset)
36 for r in rrset['records']:
37 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
38 # minify expected data
39 for r in expected_records:
40 data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
41
42 print("eq_zone_rrsets: got:")
43 pprint(data_got)
44 print("eq_zone_rrsets: expected:")
45 pprint(data_expected)
46
47 assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
48
49
50 class Zones(ApiTestCase):
51
52 def _test_list_zones(self, dnssec=True):
53 path = "/api/v1/servers/localhost/zones"
54 if not dnssec:
55 path = path + "?dnssec=false"
56 r = self.session.get(self.url(path))
57 self.assert_success_json(r)
58 domains = r.json()
59 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
60 self.assertEquals(len(example_com), 1)
61 example_com = example_com[0]
62 print(example_com)
63 required_fields = ['id', 'url', 'name', 'kind']
64 if is_auth():
65 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial', 'account']
66 if dnssec:
67 required_fields = required_fields = ['dnssec', 'edited_serial']
68 self.assertNotEquals(example_com['serial'], 0)
69 if not dnssec:
70 self.assertNotIn('dnssec', example_com)
71 elif is_recursor():
72 required_fields = required_fields + ['recursion_desired', 'servers']
73 for field in required_fields:
74 self.assertIn(field, example_com)
75
76 def test_list_zones_with_dnssec(self):
77 if is_auth():
78 self._test_list_zones(True)
79
80 def test_list_zones_without_dnssec(self):
81 self._test_list_zones(False)
82
83 class AuthZonesHelperMixin(object):
84 def create_zone(self, name=None, **kwargs):
85 if name is None:
86 name = unique_zone_name()
87 payload = {
88 'name': name,
89 'kind': 'Native',
90 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
91 }
92 for k, v in kwargs.items():
93 if v is None:
94 del payload[k]
95 else:
96 payload[k] = v
97 print("sending", payload)
98 r = self.session.post(
99 self.url("/api/v1/servers/localhost/zones"),
100 data=json.dumps(payload),
101 headers={'content-type': 'application/json'})
102 self.assert_success_json(r)
103 self.assertEquals(r.status_code, 201)
104 reply = r.json()
105 print("reply", reply)
106 return name, payload, reply
107
108
109 @unittest.skipIf(not is_auth(), "Not applicable")
110 class AuthZones(ApiTestCase, AuthZonesHelperMixin):
111
112 def test_create_zone(self):
113 # soa_edit_api has a default, override with empty for this test
114 name, payload, data = self.create_zone(serial=22, soa_edit_api='')
115 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'edited_serial', 'soa_edit_api', 'soa_edit', 'account'):
116 self.assertIn(k, data)
117 if k in payload:
118 self.assertEquals(data[k], payload[k])
119 # validate generated SOA
120 expected_soa = "a.misconfigured.powerdns.server. hostmaster." + name + " " + \
121 str(payload['serial']) + " 10800 3600 604800 3600"
122 self.assertEquals(
123 get_first_rec(data, name, 'SOA')['content'],
124 expected_soa
125 )
126 # Because we had confusion about dots, check that the DB is without dots.
127 dbrecs = get_db_records(name, 'SOA')
128 self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
129 self.assertNotEquals(data['serial'], data['edited_serial'])
130
131 def test_create_zone_with_soa_edit_api(self):
132 # soa_edit_api wins over serial
133 name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
134 for k in ('soa_edit_api', ):
135 self.assertIn(k, data)
136 if k in payload:
137 self.assertEquals(data[k], payload[k])
138 # generated EPOCH serial surely is > fixed serial we passed in
139 print(data)
140 self.assertGreater(data['serial'], payload['serial'])
141 soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
142 self.assertGreater(soa_serial, payload['serial'])
143 self.assertEquals(soa_serial, data['serial'])
144
145 def test_create_zone_with_account(self):
146 # soa_edit_api wins over serial
147 name, payload, data = self.create_zone(account='anaccount', serial=10)
148 print(data)
149 for k in ('account', ):
150 self.assertIn(k, data)
151 if k in payload:
152 self.assertEquals(data[k], payload[k])
153
154 def test_create_zone_default_soa_edit_api(self):
155 name, payload, data = self.create_zone()
156 print(data)
157 self.assertEquals(data['soa_edit_api'], 'DEFAULT')
158
159 def test_create_zone_exists(self):
160 name, payload, data = self.create_zone()
161 print(data)
162 payload = {
163 'name': name,
164 'kind': 'Native'
165 }
166 print(payload)
167 r = self.session.post(
168 self.url("/api/v1/servers/localhost/zones"),
169 data=json.dumps(payload),
170 headers={'content-type': 'application/json'})
171 self.assertEquals(r.status_code, 409) # Conflict - already exists
172
173 def test_create_zone_with_soa_edit(self):
174 name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
175 print(data)
176 self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT')
177 self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
178 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
179 # These particular settings lead to the first serial set to YYYYMMDD01.
180 self.assertEquals(soa_serial[-2:], '01')
181 rrset = {
182 'changetype': 'replace',
183 'name': name,
184 'type': 'A',
185 'ttl': 3600,
186 'records': [
187 {
188 "content": "127.0.0.1",
189 "disabled": False
190 }
191 ]
192 }
193 payload = {'rrsets': [rrset]}
194 self.session.patch(
195 self.url("/api/v1/servers/localhost/zones/" + data['id']),
196 data=json.dumps(payload),
197 headers={'content-type': 'application/json'})
198 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
199 data = r.json()
200 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
201 self.assertEquals(soa_serial[-2:], '02')
202
203 def test_create_zone_with_records(self):
204 name = unique_zone_name()
205 rrset = {
206 "name": name,
207 "type": "A",
208 "ttl": 3600,
209 "records": [{
210 "content": "4.3.2.1",
211 "disabled": False,
212 }],
213 }
214 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
215 # check our record has appeared
216 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
217
218 def test_create_zone_with_wildcard_records(self):
219 name = unique_zone_name()
220 rrset = {
221 "name": "*."+name,
222 "type": "A",
223 "ttl": 3600,
224 "records": [{
225 "content": "4.3.2.1",
226 "disabled": False,
227 }],
228 }
229 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
230 # check our record has appeared
231 self.assertEquals(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
232
233 def test_create_zone_with_comments(self):
234 name = unique_zone_name()
235 rrsets = [
236 {
237 "name": name,
238 "type": "soa", # test uppercasing of type, too.
239 "comments": [{
240 "account": "test1",
241 "content": "blah blah",
242 "modified_at": 11112,
243 }],
244 },
245 {
246 "name": name,
247 "type": "AAAA",
248 "ttl": 3600,
249 "records": [{
250 "content": "2001:DB8::1",
251 "disabled": False,
252 }],
253 "comments": [{
254 "account": "test AAAA",
255 "content": "blah blah AAAA",
256 "modified_at": 11112,
257 }],
258 },
259 {
260 "name": name,
261 "type": "TXT",
262 "ttl": 3600,
263 "records": [{
264 "content": "\"test TXT\"",
265 "disabled": False,
266 }],
267 },
268 {
269 "name": name,
270 "type": "A",
271 "ttl": 3600,
272 "records": [{
273 "content": "192.0.2.1",
274 "disabled": False,
275 }],
276 },
277 ]
278 name, payload, data = self.create_zone(name=name, rrsets=rrsets)
279 # NS records have been created
280 self.assertEquals(len(data['rrsets']), len(rrsets) + 1)
281 # check our comment has appeared
282 self.assertEquals(get_rrset(data, name, 'SOA')['comments'], rrsets[0]['comments'])
283 self.assertEquals(get_rrset(data, name, 'A')['comments'], [])
284 self.assertEquals(get_rrset(data, name, 'TXT')['comments'], [])
285 self.assertEquals(get_rrset(data, name, 'AAAA')['comments'], rrsets[1]['comments'])
286
287 def test_create_zone_uncanonical_nameservers(self):
288 name = unique_zone_name()
289 payload = {
290 'name': name,
291 'kind': 'Native',
292 'nameservers': ['uncanon.example.com']
293 }
294 print(payload)
295 r = self.session.post(
296 self.url("/api/v1/servers/localhost/zones"),
297 data=json.dumps(payload),
298 headers={'content-type': 'application/json'})
299 self.assertEquals(r.status_code, 422)
300 self.assertIn('Nameserver is not canonical', r.json()['error'])
301
302 def test_create_auth_zone_no_name(self):
303 name = unique_zone_name()
304 payload = {
305 'name': '',
306 'kind': 'Native',
307 }
308 print(payload)
309 r = self.session.post(
310 self.url("/api/v1/servers/localhost/zones"),
311 data=json.dumps(payload),
312 headers={'content-type': 'application/json'})
313 self.assertEquals(r.status_code, 422)
314 self.assertIn('is not canonical', r.json()['error'])
315
316 def test_create_zone_with_custom_soa(self):
317 name = unique_zone_name()
318 content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
319 rrset = {
320 "name": name,
321 "type": "soa", # test uppercasing of type, too.
322 "ttl": 3600,
323 "records": [{
324 "content": content,
325 "disabled": False,
326 }],
327 }
328 name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
329 self.assertEquals(get_rrset(data, name, 'SOA')['records'], rrset['records'])
330 dbrecs = get_db_records(name, 'SOA')
331 self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
332
333 def test_create_zone_double_dot(self):
334 name = 'test..' + unique_zone_name()
335 payload = {
336 'name': name,
337 'kind': 'Native',
338 'nameservers': ['ns1.example.com.']
339 }
340 print(payload)
341 r = self.session.post(
342 self.url("/api/v1/servers/localhost/zones"),
343 data=json.dumps(payload),
344 headers={'content-type': 'application/json'})
345 self.assertEquals(r.status_code, 422)
346 self.assertIn('Unable to parse DNS Name', r.json()['error'])
347
348 def test_create_zone_restricted_chars(self):
349 name = 'test:' + unique_zone_name() # : isn't good as a name.
350 payload = {
351 'name': name,
352 'kind': 'Native',
353 'nameservers': ['ns1.example.com']
354 }
355 print(payload)
356 r = self.session.post(
357 self.url("/api/v1/servers/localhost/zones"),
358 data=json.dumps(payload),
359 headers={'content-type': 'application/json'})
360 self.assertEquals(r.status_code, 422)
361 self.assertIn('contains unsupported characters', r.json()['error'])
362
363 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self):
364 name = unique_zone_name()
365 rrset = {
366 "name": name,
367 "type": "NS",
368 "ttl": 3600,
369 "records": [{
370 "content": "ns2.example.com.",
371 "disabled": False,
372 }],
373 }
374 payload = {
375 'name': name,
376 'kind': 'Native',
377 'nameservers': ['ns1.example.com.'],
378 'rrsets': [rrset],
379 }
380 print(payload)
381 r = self.session.post(
382 self.url("/api/v1/servers/localhost/zones"),
383 data=json.dumps(payload),
384 headers={'content-type': 'application/json'})
385 self.assertEquals(r.status_code, 422)
386 self.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r.json()['error'])
387
388 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self):
389 name = unique_zone_name()
390 rrset = {
391 "name": 'subzone.'+name,
392 "type": "NS",
393 "ttl": 3600,
394 "records": [{
395 "content": "ns2.example.com.",
396 "disabled": False,
397 }],
398 }
399 payload = {
400 'name': name,
401 'kind': 'Native',
402 'nameservers': ['ns1.example.com.'],
403 'rrsets': [rrset],
404 }
405 print(payload)
406 r = self.session.post(
407 self.url("/api/v1/servers/localhost/zones"),
408 data=json.dumps(payload),
409 headers={'content-type': 'application/json'})
410 self.assert_success_json(r)
411
412 def test_create_zone_with_symbols(self):
413 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
414 name = payload['name']
415 expected_id = name.replace('/', '=2F')
416 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
417 self.assertIn(k, data)
418 if k in payload:
419 self.assertEquals(data[k], payload[k])
420 self.assertEquals(data['id'], expected_id)
421 dbrecs = get_db_records(name, 'SOA')
422 self.assertEqual(dbrecs[0]['name'], name.rstrip('.'))
423
424 def test_create_zone_with_nameservers_non_string(self):
425 # ensure we don't crash
426 name = unique_zone_name()
427 payload = {
428 'name': name,
429 'kind': 'Native',
430 'nameservers': [{'a': 'ns1.example.com'}] # invalid
431 }
432 print(payload)
433 r = self.session.post(
434 self.url("/api/v1/servers/localhost/zones"),
435 data=json.dumps(payload),
436 headers={'content-type': 'application/json'})
437 self.assertEquals(r.status_code, 422)
438
439 def test_create_zone_with_dnssec(self):
440 """
441 Create a zone with "dnssec" set and see if a key was made.
442 """
443 name = unique_zone_name()
444 name, payload, data = self.create_zone(dnssec=True)
445
446 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
447
448 for k in ('dnssec', ):
449 self.assertIn(k, data)
450 if k in payload:
451 self.assertEquals(data[k], payload[k])
452
453 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
454
455 keys = r.json()
456
457 print(keys)
458
459 self.assertEquals(r.status_code, 200)
460 self.assertEquals(len(keys), 1)
461 self.assertEquals(keys[0]['type'], 'Cryptokey')
462 self.assertEquals(keys[0]['active'], True)
463 self.assertEquals(keys[0]['keytype'], 'csk')
464
465 def test_create_zone_with_dnssec_disable_dnssec(self):
466 """
467 Create a zone with "dnssec", then set "dnssec" to false and see if the
468 keys are gone
469 """
470 name = unique_zone_name()
471 name, payload, data = self.create_zone(dnssec=True)
472
473 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
474 data=json.dumps({'dnssec': False}))
475 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
476
477 zoneinfo = r.json()
478
479 self.assertEquals(r.status_code, 200)
480 self.assertEquals(zoneinfo['dnssec'], False)
481
482 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
483
484 keys = r.json()
485
486 self.assertEquals(r.status_code, 200)
487 self.assertEquals(len(keys), 0)
488
489 def test_create_zone_with_nsec3param(self):
490 """
491 Create a zone with "nsec3param" set and see if the metadata was added.
492 """
493 name = unique_zone_name()
494 nsec3param = '1 0 500 aabbccddeeff'
495 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
496
497 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
498
499 for k in ('dnssec', 'nsec3param'):
500 self.assertIn(k, data)
501 if k in payload:
502 self.assertEquals(data[k], payload[k])
503
504 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3PARAM'))
505
506 data = r.json()
507
508 print(data)
509
510 self.assertEquals(r.status_code, 200)
511 self.assertEquals(len(data['metadata']), 1)
512 self.assertEquals(data['kind'], 'NSEC3PARAM')
513 self.assertEquals(data['metadata'][0], nsec3param)
514
515 def test_create_zone_with_nsec3narrow(self):
516 """
517 Create a zone with "nsec3narrow" set and see if the metadata was added.
518 """
519 name = unique_zone_name()
520 nsec3param = '1 0 500 aabbccddeeff'
521 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
522 nsec3narrow=True)
523
524 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
525
526 for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
527 self.assertIn(k, data)
528 if k in payload:
529 self.assertEquals(data[k], payload[k])
530
531 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3NARROW'))
532
533 data = r.json()
534
535 print(data)
536
537 self.assertEquals(r.status_code, 200)
538 self.assertEquals(len(data['metadata']), 1)
539 self.assertEquals(data['kind'], 'NSEC3NARROW')
540 self.assertEquals(data['metadata'][0], '1')
541
542 def test_create_zone_dnssec_serial(self):
543 """
544 Create a zone set/unset "dnssec" and see if the serial was increased
545 after every step
546 """
547 name = unique_zone_name()
548 name, payload, data = self.create_zone()
549
550 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
551 self.assertEquals(soa_serial[-2:], '01')
552
553 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
554 data=json.dumps({'dnssec': True}))
555 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
556
557 data = r.json()
558 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
559
560 self.assertEquals(r.status_code, 200)
561 self.assertEquals(soa_serial[-2:], '02')
562
563 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
564 data=json.dumps({'dnssec': False}))
565 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
566
567 data = r.json()
568 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
569
570 self.assertEquals(r.status_code, 200)
571 self.assertEquals(soa_serial[-2:], '03')
572
573 def test_zone_absolute_url(self):
574 name, payload, data = self.create_zone()
575 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
576 rdata = r.json()
577 print(rdata[0])
578 self.assertTrue(rdata[0]['url'].startswith('/api/v'))
579
580 def test_create_zone_metadata(self):
581 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
582 r = self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
583 data=json.dumps(payload_metadata))
584 rdata = r.json()
585 self.assertEquals(r.status_code, 201)
586 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
587
588 def test_create_zone_metadata_kind(self):
589 payload_metadata = {"metadata": ["127.0.0.2"]}
590 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
591 data=json.dumps(payload_metadata))
592 rdata = r.json()
593 self.assertEquals(r.status_code, 200)
594 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
595
596 def test_create_protected_zone_metadata(self):
597 # test whether it prevents modification of certain kinds
598 for k in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
599 payload = {"metadata": ["FOO", "BAR"]}
600 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k),
601 data=json.dumps(payload))
602 self.assertEquals(r.status_code, 422)
603
604 def test_retrieve_zone_metadata(self):
605 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
606 self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
607 data=json.dumps(payload_metadata))
608 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata"))
609 rdata = r.json()
610 self.assertEquals(r.status_code, 200)
611 self.assertIn(payload_metadata, rdata)
612
613 def test_delete_zone_metadata(self):
614 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
615 self.assertEquals(r.status_code, 200)
616 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
617 rdata = r.json()
618 self.assertEquals(r.status_code, 200)
619 self.assertEquals(rdata["metadata"], [])
620
621 def test_create_external_zone_metadata(self):
622 payload_metadata = {"metadata": ["My very important message"]}
623 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
624 data=json.dumps(payload_metadata))
625 self.assertEquals(r.status_code, 200)
626 rdata = r.json()
627 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
628
629 def test_create_metadata_in_non_existent_zone(self):
630 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
631 r = self.session.post(self.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
632 data=json.dumps(payload_metadata))
633 self.assertEquals(r.status_code, 404)
634 # Note: errors should probably contain json (see #5988)
635 # self.assertIn('Could not find domain ', r.json()['error'])
636
637 def test_create_slave_zone(self):
638 # Test that nameservers can be absent for slave zones.
639 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
640 for k in ('name', 'masters', 'kind'):
641 self.assertIn(k, data)
642 self.assertEquals(data[k], payload[k])
643 print("payload:", payload)
644 print("data:", data)
645 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
646 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
647 zonelist = r.json()
648 print("zonelist:", zonelist)
649 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
650 # Also test that fetching the zone works.
651 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
652 data = r.json()
653 print("zone (fetched):", data)
654 for k in ('name', 'masters', 'kind'):
655 self.assertIn(k, data)
656 self.assertEquals(data[k], payload[k])
657 self.assertEqual(data['serial'], 0)
658 self.assertEqual(data['rrsets'], [])
659
660 def test_find_zone_by_name(self):
661 name = 'foo/' + unique_zone_name()
662 name, payload, data = self.create_zone(name=name)
663 r = self.session.get(self.url("/api/v1/servers/localhost/zones?zone=" + name))
664 data = r.json()
665 print(data)
666 self.assertEquals(data[0]['name'], name)
667
668 def test_delete_slave_zone(self):
669 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
670 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
671 r.raise_for_status()
672
673 def test_retrieve_slave_zone(self):
674 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
675 print("payload:", payload)
676 print("data:", data)
677 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
678 data = r.json()
679 print("status for axfr-retrieve:", data)
680 self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
681 '\' from master 127.0.0.2')
682
683 def test_notify_master_zone(self):
684 name, payload, data = self.create_zone(kind='Master')
685 print("payload:", payload)
686 print("data:", data)
687 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
688 data = r.json()
689 print("status for notify:", data)
690 self.assertEqual(data['result'], 'Notification queued')
691
692 def test_get_zone_with_symbols(self):
693 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
694 name = payload['name']
695 zone_id = (name.replace('/', '=2F'))
696 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
697 data = r.json()
698 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
699 self.assertIn(k, data)
700 if k in payload:
701 self.assertEquals(data[k], payload[k])
702
703 def test_get_zone(self):
704 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
705 domains = r.json()
706 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
707 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id']))
708 self.assert_success_json(r)
709 data = r.json()
710 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
711 self.assertIn(k, data)
712 self.assertEquals(data['name'], 'example.com.')
713
714 def test_import_zone_broken(self):
715 payload = {
716 'name': 'powerdns-broken.com',
717 'kind': 'Master',
718 'nameservers': [],
719 }
720 payload['zone'] = """
721 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
722 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
723 ;; WARNING: recursion requested but not available
724
725 ;; OPT PSEUDOSECTION:
726 ; EDNS: version: 0, flags:; udp: 1680
727 ;; QUESTION SECTION:
728 ;powerdns.com. IN SOA
729
730 ;; ANSWER SECTION:
731 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
732 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
733 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
734 powerdns-broken.com. 86400 IN A 82.94.213.34
735 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
736 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
737 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
738 """
739 r = self.session.post(
740 self.url("/api/v1/servers/localhost/zones"),
741 data=json.dumps(payload),
742 headers={'content-type': 'application/json'})
743 self.assertEquals(r.status_code, 422)
744
745 def test_import_zone_axfr_outofzone(self):
746 # Ensure we don't create out-of-zone records
747 payload = {
748 'name': unique_zone_name(),
749 'kind': 'Master',
750 'nameservers': [],
751 }
752 payload['zone'] = """
753 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
754 %NAME% 3600 IN NS powerdnssec2.ds9a.nl.
755 example.org. 3600 IN AAAA 2001:888:2000:1d::2
756 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
757 """.replace('%NAME%', payload['name'])
758 r = self.session.post(
759 self.url("/api/v1/servers/localhost/zones"),
760 data=json.dumps(payload),
761 headers={'content-type': 'application/json'})
762 self.assertEquals(r.status_code, 422)
763 self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
764
765 def test_import_zone_axfr(self):
766 payload = {
767 'name': 'powerdns.com.',
768 'kind': 'Master',
769 'nameservers': [],
770 'soa_edit_api': '', # turn off so exact SOA comparison works.
771 }
772 payload['zone'] = """
773 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
774 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
775 ;; WARNING: recursion requested but not available
776
777 ;; OPT PSEUDOSECTION:
778 ; EDNS: version: 0, flags:; udp: 1680
779 ;; QUESTION SECTION:
780 ;powerdns.com. IN SOA
781
782 ;; ANSWER SECTION:
783 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
784 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
785 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
786 powerdns.com. 86400 IN A 82.94.213.34
787 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
788 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
789 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
790 """
791 r = self.session.post(
792 self.url("/api/v1/servers/localhost/zones"),
793 data=json.dumps(payload),
794 headers={'content-type': 'application/json'})
795 self.assert_success_json(r)
796 data = r.json()
797 self.assertIn('name', data)
798
799 expected = {
800 'NS': [
801 {'content': 'powerdnssec1.ds9a.nl.'},
802 {'content': 'powerdnssec2.ds9a.nl.'},
803 ],
804 'SOA': [
805 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
806 ],
807 'MX': [
808 {'content': '0 xs.powerdns.com.'},
809 ],
810 'A': [
811 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
812 ],
813 'AAAA': [
814 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
815 ],
816 }
817
818 eq_zone_rrsets(data['rrsets'], expected)
819
820 # check content in DB is stored WITHOUT trailing dot.
821 dbrecs = get_db_records(payload['name'], 'NS')
822 dbrec = next((dbrec for dbrec in dbrecs if dbrec['content'].startswith('powerdnssec1')))
823 self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
824
825 def test_import_zone_bind(self):
826 payload = {
827 'name': 'example.org.',
828 'kind': 'Master',
829 'nameservers': [],
830 'soa_edit_api': '', # turn off so exact SOA comparison works.
831 }
832 payload['zone'] = """
833 $TTL 86400 ; 24 hours could have been written as 24h or 1d
834 ; $TTL used for all RRs without explicit TTL value
835 $ORIGIN example.org.
836 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
837 2002022401 ; serial
838 3H ; refresh
839 15 ; retry
840 1w ; expire
841 3h ; minimum
842 )
843 IN NS ns1.example.org. ; in the domain
844 IN NS ns2.smokeyjoe.com. ; external to domain
845 IN MX 10 mail.another.com. ; external mail provider
846 ; server host definitions
847 ns1 IN A 192.168.0.1 ;name server definition
848 www IN A 192.168.0.2 ;web server definition
849 ftp IN CNAME www.example.org. ;ftp server definition
850 ; non server domain hosts
851 bill IN A 192.168.0.3
852 fred IN A 192.168.0.4
853 """
854 r = self.session.post(
855 self.url("/api/v1/servers/localhost/zones"),
856 data=json.dumps(payload),
857 headers={'content-type': 'application/json'})
858 self.assert_success_json(r)
859 data = r.json()
860 self.assertIn('name', data)
861
862 expected = {
863 'NS': [
864 {'content': 'ns1.example.org.'},
865 {'content': 'ns2.smokeyjoe.com.'},
866 ],
867 'SOA': [
868 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
869 ],
870 'MX': [
871 {'content': '10 mail.another.com.'},
872 ],
873 'A': [
874 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
875 {'content': '192.168.0.2', 'name': 'www.example.org.'},
876 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
877 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
878 ],
879 'CNAME': [
880 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
881 ],
882 }
883
884 eq_zone_rrsets(data['rrsets'], expected)
885
886 def test_import_zone_bind_cname_apex(self):
887 payload = {
888 'name': unique_zone_name(),
889 'kind': 'Master',
890 'nameservers': [],
891 }
892 payload['zone'] = """
893 $ORIGIN %NAME%
894 @ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
895 @ IN NS ns1.example.org.
896 @ IN NS ns2.smokeyjoe.com.
897 @ IN CNAME www.example.org.
898 """.replace('%NAME%', payload['name'])
899 r = self.session.post(
900 self.url("/api/v1/servers/localhost/zones"),
901 data=json.dumps(payload),
902 headers={'content-type': 'application/json'})
903 self.assertEquals(r.status_code, 422)
904 self.assertIn('Conflicts with another RRset', r.json()['error'])
905
906 def test_export_zone_json(self):
907 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
908 # export it
909 r = self.session.get(
910 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
911 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
912 )
913 self.assert_success_json(r)
914 data = r.json()
915 self.assertIn('zone', data)
916 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
917 name + '\t3600\tIN\tNS\tns2.foo.com.',
918 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
919 ' 0 10800 3600 604800 3600']
920 self.assertEquals(data['zone'].strip().split('\n'), expected_data)
921
922 def test_export_zone_text(self):
923 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
924 # export it
925 r = self.session.get(
926 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
927 headers={'accept': '*/*'}
928 )
929 data = r.text.strip().split("\n")
930 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
931 name + '\t3600\tIN\tNS\tns2.foo.com.',
932 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
933 ' 0 10800 3600 604800 3600']
934 self.assertEquals(data, expected_data)
935
936 def test_update_zone(self):
937 name, payload, zone = self.create_zone()
938 name = payload['name']
939 # update, set as Master and enable SOA-EDIT-API
940 payload = {
941 'kind': 'Master',
942 'masters': ['192.0.2.1', '192.0.2.2'],
943 'soa_edit_api': 'EPOCH',
944 'soa_edit': 'EPOCH'
945 }
946 r = self.session.put(
947 self.url("/api/v1/servers/localhost/zones/" + name),
948 data=json.dumps(payload),
949 headers={'content-type': 'application/json'})
950 self.assert_success(r)
951 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
952 for k in payload.keys():
953 self.assertIn(k, data)
954 self.assertEquals(data[k], payload[k])
955 # update, back to Native and empty(off)
956 payload = {
957 'kind': 'Native',
958 'soa_edit_api': '',
959 'soa_edit': ''
960 }
961 r = self.session.put(
962 self.url("/api/v1/servers/localhost/zones/" + name),
963 data=json.dumps(payload),
964 headers={'content-type': 'application/json'})
965 self.assert_success(r)
966 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
967 for k in payload.keys():
968 self.assertIn(k, data)
969 self.assertEquals(data[k], payload[k])
970
971 def test_zone_rr_update(self):
972 name, payload, zone = self.create_zone()
973 # do a replace (= update)
974 rrset = {
975 'changetype': 'replace',
976 'name': name,
977 'type': 'ns',
978 'ttl': 3600,
979 'records': [
980 {
981 "content": "ns1.bar.com.",
982 "disabled": False
983 },
984 {
985 "content": "ns2-disabled.bar.com.",
986 "disabled": True
987 }
988 ]
989 }
990 payload = {'rrsets': [rrset]}
991 r = self.session.patch(
992 self.url("/api/v1/servers/localhost/zones/" + name),
993 data=json.dumps(payload),
994 headers={'content-type': 'application/json'})
995 self.assert_success(r)
996 # verify that (only) the new record is there
997 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
998 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset['records'])
999
1000 def test_zone_rr_update_mx(self):
1001 # Important to test with MX records, as they have a priority field, which must end up in the content field.
1002 name, payload, zone = self.create_zone()
1003 # do a replace (= update)
1004 rrset = {
1005 'changetype': 'replace',
1006 'name': name,
1007 'type': 'MX',
1008 'ttl': 3600,
1009 'records': [
1010 {
1011 "content": "10 mail.example.org.",
1012 "disabled": False
1013 }
1014 ]
1015 }
1016 payload = {'rrsets': [rrset]}
1017 r = self.session.patch(
1018 self.url("/api/v1/servers/localhost/zones/" + name),
1019 data=json.dumps(payload),
1020 headers={'content-type': 'application/json'})
1021 self.assert_success(r)
1022 # verify that (only) the new record is there
1023 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1024 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset['records'])
1025
1026 def test_zone_rr_update_invalid_mx(self):
1027 name, payload, zone = self.create_zone()
1028 # do a replace (= update)
1029 rrset = {
1030 'changetype': 'replace',
1031 'name': name,
1032 'type': 'MX',
1033 'ttl': 3600,
1034 'records': [
1035 {
1036 "content": "10 mail@mx.example.org.",
1037 "disabled": False
1038 }
1039 ]
1040 }
1041 payload = {'rrsets': [rrset]}
1042 r = self.session.patch(
1043 self.url("/api/v1/servers/localhost/zones/" + name),
1044 data=json.dumps(payload),
1045 headers={'content-type': 'application/json'})
1046 self.assertEquals(r.status_code, 422)
1047 self.assertIn('non-hostname content', r.json()['error'])
1048 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1049 self.assertIsNone(get_rrset(data, name, 'MX'))
1050
1051 def test_zone_rr_update_opt(self):
1052 name, payload, zone = self.create_zone()
1053 # do a replace (= update)
1054 rrset = {
1055 'changetype': 'replace',
1056 'name': name,
1057 'type': 'OPT',
1058 'ttl': 3600,
1059 'records': [
1060 {
1061 "content": "9",
1062 "disabled": False
1063 }
1064 ]
1065 }
1066 payload = {'rrsets': [rrset]}
1067 r = self.session.patch(
1068 self.url("/api/v1/servers/localhost/zones/" + name),
1069 data=json.dumps(payload),
1070 headers={'content-type': 'application/json'})
1071 self.assertEquals(r.status_code, 422)
1072 self.assertIn('OPT: invalid type given', r.json()['error'])
1073
1074 def test_zone_rr_update_multiple_rrsets(self):
1075 name, payload, zone = self.create_zone()
1076 rrset1 = {
1077 'changetype': 'replace',
1078 'name': name,
1079 'type': 'NS',
1080 'ttl': 3600,
1081 'records': [
1082 {
1083
1084 "content": "ns9999.example.com.",
1085 "disabled": False
1086 }
1087 ]
1088 }
1089 rrset2 = {
1090 'changetype': 'replace',
1091 'name': name,
1092 'type': 'MX',
1093 'ttl': 3600,
1094 'records': [
1095 {
1096 "content": "10 mx444.example.com.",
1097 "disabled": False
1098 }
1099 ]
1100 }
1101 payload = {'rrsets': [rrset1, rrset2]}
1102 r = self.session.patch(
1103 self.url("/api/v1/servers/localhost/zones/" + name),
1104 data=json.dumps(payload),
1105 headers={'content-type': 'application/json'})
1106 self.assert_success(r)
1107 # verify that all rrsets have been updated
1108 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1109 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset1['records'])
1110 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset2['records'])
1111
1112 def test_zone_rr_update_duplicate_record(self):
1113 name, payload, zone = self.create_zone()
1114 rrset = {
1115 'changetype': 'replace',
1116 'name': name,
1117 'type': 'NS',
1118 'ttl': 3600,
1119 'records': [
1120 {"content": "ns9999.example.com.", "disabled": False},
1121 {"content": "ns9996.example.com.", "disabled": False},
1122 {"content": "ns9987.example.com.", "disabled": False},
1123 {"content": "ns9988.example.com.", "disabled": False},
1124 {"content": "ns9999.example.com.", "disabled": False},
1125 ]
1126 }
1127 payload = {'rrsets': [rrset]}
1128 r = self.session.patch(
1129 self.url("/api/v1/servers/localhost/zones/" + name),
1130 data=json.dumps(payload),
1131 headers={'content-type': 'application/json'})
1132 self.assertEquals(r.status_code, 422)
1133 self.assertIn('Duplicate record in RRset', r.json()['error'])
1134
1135 def test_zone_rr_update_duplicate_rrset(self):
1136 name, payload, zone = self.create_zone()
1137 rrset1 = {
1138 'changetype': 'replace',
1139 'name': name,
1140 'type': 'NS',
1141 'ttl': 3600,
1142 'records': [
1143 {
1144 "content": "ns9999.example.com.",
1145 "disabled": False
1146 }
1147 ]
1148 }
1149 rrset2 = {
1150 'changetype': 'replace',
1151 'name': name,
1152 'type': 'NS',
1153 'ttl': 3600,
1154 'records': [
1155 {
1156 "content": "ns9998.example.com.",
1157 "disabled": False
1158 }
1159 ]
1160 }
1161 payload = {'rrsets': [rrset1, rrset2]}
1162 r = self.session.patch(
1163 self.url("/api/v1/servers/localhost/zones/" + name),
1164 data=json.dumps(payload),
1165 headers={'content-type': 'application/json'})
1166 self.assertEquals(r.status_code, 422)
1167 self.assertIn('Duplicate RRset', r.json()['error'])
1168
1169 def test_zone_rr_delete(self):
1170 name, payload, zone = self.create_zone()
1171 # do a delete of all NS records (these are created with the zone)
1172 rrset = {
1173 'changetype': 'delete',
1174 'name': name,
1175 'type': 'NS'
1176 }
1177 payload = {'rrsets': [rrset]}
1178 r = self.session.patch(
1179 self.url("/api/v1/servers/localhost/zones/" + name),
1180 data=json.dumps(payload),
1181 headers={'content-type': 'application/json'})
1182 self.assert_success(r)
1183 # verify that the records are gone
1184 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1185 self.assertIsNone(get_rrset(data, name, 'NS'))
1186
1187 def test_zone_disable_reenable(self):
1188 # This also tests that SOA-EDIT-API works.
1189 name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
1190 # disable zone by disabling SOA
1191 rrset = {
1192 'changetype': 'replace',
1193 'name': name,
1194 'type': 'SOA',
1195 'ttl': 3600,
1196 'records': [
1197 {
1198 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1199 "disabled": True
1200 }
1201 ]
1202 }
1203 payload = {'rrsets': [rrset]}
1204 r = self.session.patch(
1205 self.url("/api/v1/servers/localhost/zones/" + name),
1206 data=json.dumps(payload),
1207 headers={'content-type': 'application/json'})
1208 self.assert_success(r)
1209 # check SOA serial has been edited
1210 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1211 soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1212 self.assertNotEquals(soa_serial1, '1')
1213 # make sure domain is still in zone list (disabled SOA!)
1214 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
1215 domains = r.json()
1216 self.assertEquals(len([domain for domain in domains if domain['name'] == name]), 1)
1217 # sleep 1sec to ensure the EPOCH value changes for the next request
1218 time.sleep(1)
1219 # verify that modifying it still works
1220 rrset['records'][0]['disabled'] = False
1221 payload = {'rrsets': [rrset]}
1222 r = self.session.patch(
1223 self.url("/api/v1/servers/localhost/zones/" + name),
1224 data=json.dumps(payload),
1225 headers={'content-type': 'application/json'})
1226 self.assert_success(r)
1227 # check SOA serial has been edited again
1228 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1229 soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1230 self.assertNotEquals(soa_serial2, '1')
1231 self.assertNotEquals(soa_serial2, soa_serial1)
1232
1233 def test_zone_rr_update_out_of_zone(self):
1234 name, payload, zone = self.create_zone()
1235 # replace with qname mismatch
1236 rrset = {
1237 'changetype': 'replace',
1238 'name': 'not-in-zone.',
1239 'type': 'NS',
1240 'ttl': 3600,
1241 'records': [
1242 {
1243 "content": "ns1.bar.com.",
1244 "disabled": False
1245 }
1246 ]
1247 }
1248 payload = {'rrsets': [rrset]}
1249 r = self.session.patch(
1250 self.url("/api/v1/servers/localhost/zones/" + name),
1251 data=json.dumps(payload),
1252 headers={'content-type': 'application/json'})
1253 self.assertEquals(r.status_code, 422)
1254 self.assertIn('out of zone', r.json()['error'])
1255
1256 def test_zone_rr_update_restricted_chars(self):
1257 name, payload, zone = self.create_zone()
1258 # replace with qname mismatch
1259 rrset = {
1260 'changetype': 'replace',
1261 'name': 'test:' + name,
1262 'type': 'NS',
1263 'ttl': 3600,
1264 'records': [
1265 {
1266 "content": "ns1.bar.com.",
1267 "disabled": False
1268 }
1269 ]
1270 }
1271 payload = {'rrsets': [rrset]}
1272 r = self.session.patch(
1273 self.url("/api/v1/servers/localhost/zones/" + name),
1274 data=json.dumps(payload),
1275 headers={'content-type': 'application/json'})
1276 self.assertEquals(r.status_code, 422)
1277 self.assertIn('contains unsupported characters', r.json()['error'])
1278
1279 def test_rrset_unknown_type(self):
1280 name, payload, zone = self.create_zone()
1281 rrset = {
1282 'changetype': 'replace',
1283 'name': name,
1284 'type': 'FAFAFA',
1285 'ttl': 3600,
1286 'records': [
1287 {
1288 "content": "4.3.2.1",
1289 "disabled": False
1290 }
1291 ]
1292 }
1293 payload = {'rrsets': [rrset]}
1294 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1295 headers={'content-type': 'application/json'})
1296 self.assertEquals(r.status_code, 422)
1297 self.assertIn('unknown type', r.json()['error'])
1298
1299 @parameterized.expand([
1300 ('CNAME', ),
1301 ])
1302 def test_rrset_exclusive_and_other(self, qtype):
1303 name, payload, zone = self.create_zone()
1304 rrset = {
1305 'changetype': 'replace',
1306 'name': name,
1307 'type': qtype,
1308 'ttl': 3600,
1309 'records': [
1310 {
1311 "content": "example.org.",
1312 "disabled": False
1313 }
1314 ]
1315 }
1316 payload = {'rrsets': [rrset]}
1317 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1318 headers={'content-type': 'application/json'})
1319 self.assertEquals(r.status_code, 422)
1320 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1321
1322 @parameterized.expand([
1323 ('CNAME', ),
1324 ])
1325 def test_rrset_other_and_exclusive(self, qtype):
1326 name, payload, zone = self.create_zone()
1327 rrset = {
1328 'changetype': 'replace',
1329 'name': 'sub.'+name,
1330 'type': qtype,
1331 'ttl': 3600,
1332 'records': [
1333 {
1334 "content": "example.org.",
1335 "disabled": False
1336 }
1337 ]
1338 }
1339 payload = {'rrsets': [rrset]}
1340 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1341 headers={'content-type': 'application/json'})
1342 self.assert_success(r)
1343 rrset = {
1344 'changetype': 'replace',
1345 'name': 'sub.'+name,
1346 'type': 'A',
1347 'ttl': 3600,
1348 'records': [
1349 {
1350 "content": "1.2.3.4",
1351 "disabled": False
1352 }
1353 ]
1354 }
1355 payload = {'rrsets': [rrset]}
1356 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1357 headers={'content-type': 'application/json'})
1358 self.assertEquals(r.status_code, 422)
1359 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1360
1361 @parameterized.expand([
1362 ('SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
1363 ('CNAME', ['01.example.org.', '02.example.org.']),
1364 ])
1365 def test_rrset_single_qtypes(self, qtype, contents):
1366 name, payload, zone = self.create_zone()
1367 rrset = {
1368 'changetype': 'replace',
1369 'name': 'sub.'+name,
1370 'type': qtype,
1371 'ttl': 3600,
1372 'records': [
1373 {
1374 "content": contents[0],
1375 "disabled": False
1376 },
1377 {
1378 "content": contents[1],
1379 "disabled": False
1380 }
1381 ]
1382 }
1383 payload = {'rrsets': [rrset]}
1384 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1385 headers={'content-type': 'application/json'})
1386 self.assertEquals(r.status_code, 422)
1387 self.assertIn('IN ' + qtype + ' has more than one record', r.json()['error'])
1388
1389 def test_rrset_zone_apex(self):
1390 name, payload, zone = self.create_zone()
1391 rrset1 = {
1392 'changetype': 'replace',
1393 'name': name,
1394 'type': 'SOA',
1395 'ttl': 3600,
1396 'records': [
1397 {
1398 "content": 'ns1.example.org. test@example.org. 10 10800 3600 604800 3600',
1399 "disabled": False
1400 },
1401 ]
1402 }
1403 rrset2 = {
1404 'changetype': 'replace',
1405 'name': name,
1406 'type': 'DNAME',
1407 'ttl': 3600,
1408 'records': [
1409 {
1410 "content": 'example.com.',
1411 "disabled": False
1412 },
1413 ]
1414 }
1415
1416 payload = {'rrsets': [rrset1, rrset2]}
1417 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1418 headers={'content-type': 'application/json'})
1419 self.assert_success(r) # user should be able to create DNAME at APEX as per RFC 6672 section 2.3
1420
1421 def test_rrset_ns_dname_exclude(self):
1422 name, payload, zone = self.create_zone()
1423 rrset = {
1424 'changetype': 'replace',
1425 'name': 'delegation.'+name,
1426 'type': 'NS',
1427 'ttl': 3600,
1428 'records': [
1429 {
1430 "content": "ns.example.org.",
1431 "disabled": False
1432 }
1433 ]
1434 }
1435 payload = {'rrsets': [rrset]}
1436 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1437 headers={'content-type': 'application/json'})
1438 self.assert_success(r)
1439 rrset = {
1440 'changetype': 'replace',
1441 'name': 'delegation.'+name,
1442 'type': 'DNAME',
1443 'ttl': 3600,
1444 'records': [
1445 {
1446 "content": "example.com.",
1447 "disabled": False
1448 }
1449 ]
1450 }
1451 payload = {'rrsets': [rrset]}
1452 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1453 headers={'content-type': 'application/json'})
1454 self.assertEquals(r.status_code, 422)
1455 self.assertIn('Cannot have both NS and DNAME except in zone apex', r.json()['error'])
1456
1457 ## FIXME: Enable this when it's time for it
1458 # def test_rrset_dname_nothing_under(self):
1459 # name, payload, zone = self.create_zone()
1460 # rrset = {
1461 # 'changetype': 'replace',
1462 # 'name': 'delegation.'+name,
1463 # 'type': 'DNAME',
1464 # 'ttl': 3600,
1465 # 'records': [
1466 # {
1467 # "content": "example.com.",
1468 # "disabled": False
1469 # }
1470 # ]
1471 # }
1472 # payload = {'rrsets': [rrset]}
1473 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1474 # headers={'content-type': 'application/json'})
1475 # self.assert_success(r)
1476 # rrset = {
1477 # 'changetype': 'replace',
1478 # 'name': 'sub.delegation.'+name,
1479 # 'type': 'A',
1480 # 'ttl': 3600,
1481 # 'records': [
1482 # {
1483 # "content": "1.2.3.4",
1484 # "disabled": False
1485 # }
1486 # ]
1487 # }
1488 # payload = {'rrsets': [rrset]}
1489 # r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1490 # headers={'content-type': 'application/json'})
1491 # self.assertEquals(r.status_code, 422)
1492 # self.assertIn('You cannot have record(s) under CNAME/DNAME', r.json()['error'])
1493
1494 def test_create_zone_with_leading_space(self):
1495 # Actual regression.
1496 name, payload, zone = self.create_zone()
1497 rrset = {
1498 'changetype': 'replace',
1499 'name': name,
1500 'type': 'A',
1501 'ttl': 3600,
1502 'records': [
1503 {
1504 "content": " 4.3.2.1",
1505 "disabled": False
1506 }
1507 ]
1508 }
1509 payload = {'rrsets': [rrset]}
1510 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1511 headers={'content-type': 'application/json'})
1512 self.assertEquals(r.status_code, 422)
1513 self.assertIn('Not in expected format', r.json()['error'])
1514
1515 def test_zone_rr_delete_out_of_zone(self):
1516 name, payload, zone = self.create_zone()
1517 rrset = {
1518 'changetype': 'delete',
1519 'name': 'not-in-zone.',
1520 'type': 'NS'
1521 }
1522 payload = {'rrsets': [rrset]}
1523 r = self.session.patch(
1524 self.url("/api/v1/servers/localhost/zones/" + name),
1525 data=json.dumps(payload),
1526 headers={'content-type': 'application/json'})
1527 print(r.content)
1528 self.assert_success(r) # succeed so users can fix their wrong, old data
1529
1530 def test_zone_delete(self):
1531 name, payload, zone = self.create_zone()
1532 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1533 self.assertEquals(r.status_code, 204)
1534 self.assertNotIn('Content-Type', r.headers)
1535
1536 def test_zone_comment_create(self):
1537 name, payload, zone = self.create_zone()
1538 rrset = {
1539 'changetype': 'replace',
1540 'name': name,
1541 'type': 'NS',
1542 'ttl': 3600,
1543 'comments': [
1544 {
1545 'account': 'test1',
1546 'content': 'blah blah',
1547 },
1548 {
1549 'account': 'test2',
1550 'content': 'blah blah bleh',
1551 }
1552 ]
1553 }
1554 payload = {'rrsets': [rrset]}
1555 r = self.session.patch(
1556 self.url("/api/v1/servers/localhost/zones/" + name),
1557 data=json.dumps(payload),
1558 headers={'content-type': 'application/json'})
1559 self.assert_success(r)
1560 # make sure the comments have been set, and that the NS
1561 # records are still present
1562 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1563 serverset = get_rrset(data, name, 'NS')
1564 print(serverset)
1565 self.assertNotEquals(serverset['records'], [])
1566 self.assertNotEquals(serverset['comments'], [])
1567 # verify that modified_at has been set by pdns
1568 self.assertNotEquals([c for c in serverset['comments']][0]['modified_at'], 0)
1569 # verify that TTL is correct (regression test)
1570 self.assertEquals(serverset['ttl'], 3600)
1571
1572 def test_zone_comment_delete(self):
1573 # Test: Delete ONLY comments.
1574 name, payload, zone = self.create_zone()
1575 rrset = {
1576 'changetype': 'replace',
1577 'name': name,
1578 'type': 'NS',
1579 'comments': []
1580 }
1581 payload = {'rrsets': [rrset]}
1582 r = self.session.patch(
1583 self.url("/api/v1/servers/localhost/zones/" + name),
1584 data=json.dumps(payload),
1585 headers={'content-type': 'application/json'})
1586 self.assert_success(r)
1587 # make sure the NS records are still present
1588 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1589 serverset = get_rrset(data, name, 'NS')
1590 print(serverset)
1591 self.assertNotEquals(serverset['records'], [])
1592 self.assertEquals(serverset['comments'], [])
1593
1594 def test_zone_comment_out_of_range_modified_at(self):
1595 # Test if comments on an rrset stay intact if the rrset is replaced
1596 name, payload, zone = self.create_zone()
1597 rrset = {
1598 'changetype': 'replace',
1599 'name': name,
1600 'type': 'NS',
1601 'comments': [
1602 {
1603 'account': 'test1',
1604 'content': 'oh hi there',
1605 'modified_at': '4294967297'
1606 }
1607 ]
1608 }
1609 payload = {'rrsets': [rrset]}
1610 r = self.session.patch(
1611 self.url("/api/v1/servers/localhost/zones/" + name),
1612 data=json.dumps(payload),
1613 headers={'content-type': 'application/json'})
1614 self.assertEquals(r.status_code, 422)
1615 self.assertIn("Value for key 'modified_at' is out of range", r.json()['error'])
1616
1617 def test_zone_comment_stay_intact(self):
1618 # Test if comments on an rrset stay intact if the rrset is replaced
1619 name, payload, zone = self.create_zone()
1620 # create a comment
1621 rrset = {
1622 'changetype': 'replace',
1623 'name': name,
1624 'type': 'NS',
1625 'comments': [
1626 {
1627 'account': 'test1',
1628 'content': 'oh hi there',
1629 'modified_at': 1111
1630 }
1631 ]
1632 }
1633 payload = {'rrsets': [rrset]}
1634 r = self.session.patch(
1635 self.url("/api/v1/servers/localhost/zones/" + name),
1636 data=json.dumps(payload),
1637 headers={'content-type': 'application/json'})
1638 self.assert_success(r)
1639 # replace rrset records
1640 rrset2 = {
1641 'changetype': 'replace',
1642 'name': name,
1643 'type': 'NS',
1644 'ttl': 3600,
1645 'records': [
1646 {
1647 "content": "ns1.bar.com.",
1648 "disabled": False
1649 }
1650 ]
1651 }
1652 payload2 = {'rrsets': [rrset2]}
1653 r = self.session.patch(
1654 self.url("/api/v1/servers/localhost/zones/" + name),
1655 data=json.dumps(payload2),
1656 headers={'content-type': 'application/json'})
1657 self.assert_success(r)
1658 # make sure the comments still exist
1659 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1660 serverset = get_rrset(data, name, 'NS')
1661 print(serverset)
1662 self.assertEquals(serverset['records'], rrset2['records'])
1663 self.assertEquals(serverset['comments'], rrset['comments'])
1664
1665 def test_zone_auto_ptr_ipv4_create(self):
1666 revzone = '4.2.192.in-addr.arpa.'
1667 _, _, revzonedata = self.create_zone(name=revzone)
1668 name = unique_zone_name()
1669 rrset = {
1670 "name": name,
1671 "type": "A",
1672 "ttl": 3600,
1673 "records": [{
1674 "content": "192.2.4.44",
1675 "disabled": False,
1676 "set-ptr": True,
1677 }],
1678 }
1679 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
1680 del rrset['records'][0]['set-ptr']
1681 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
1682 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1683 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1684 print(revsets)
1685 self.assertEquals(revsets, [{
1686 u'name': u'44.4.2.192.in-addr.arpa.',
1687 u'ttl': 3600,
1688 u'type': u'PTR',
1689 u'comments': [],
1690 u'records': [{
1691 u'content': name,
1692 u'disabled': False,
1693 }],
1694 }])
1695 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1696 self.assertGreater(r['serial'], revzonedata['serial'])
1697
1698 def test_zone_auto_ptr_ipv4_update(self):
1699 revzone = '0.2.192.in-addr.arpa.'
1700 _, _, revzonedata = self.create_zone(name=revzone)
1701 name, payload, zone = self.create_zone()
1702 rrset = {
1703 'changetype': 'replace',
1704 'name': name,
1705 'type': 'A',
1706 'ttl': 3600,
1707 'records': [
1708 {
1709 "content": '192.2.0.2',
1710 "disabled": False,
1711 "set-ptr": True
1712 }
1713 ]
1714 }
1715 payload = {'rrsets': [rrset]}
1716 r = self.session.patch(
1717 self.url("/api/v1/servers/localhost/zones/" + name),
1718 data=json.dumps(payload),
1719 headers={'content-type': 'application/json'})
1720 self.assert_success(r)
1721 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1722 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1723 print(revsets)
1724 self.assertEquals(revsets, [{
1725 u'name': u'2.0.2.192.in-addr.arpa.',
1726 u'ttl': 3600,
1727 u'type': u'PTR',
1728 u'comments': [],
1729 u'records': [{
1730 u'content': name,
1731 u'disabled': False,
1732 }],
1733 }])
1734 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1735 self.assertGreater(r['serial'], revzonedata['serial'])
1736
1737 def test_zone_auto_ptr_ipv6_update(self):
1738 # 2001:DB8::bb:aa
1739 revzone = '8.b.d.0.1.0.0.2.ip6.arpa.'
1740 _, _, revzonedata = self.create_zone(name=revzone)
1741 name, payload, zone = self.create_zone()
1742 rrset = {
1743 'changetype': 'replace',
1744 'name': name,
1745 'type': 'AAAA',
1746 'ttl': 3600,
1747 'records': [
1748 {
1749 "content": '2001:DB8::bb:aa',
1750 "disabled": False,
1751 "set-ptr": True
1752 }
1753 ]
1754 }
1755 payload = {'rrsets': [rrset]}
1756 r = self.session.patch(
1757 self.url("/api/v1/servers/localhost/zones/" + name),
1758 data=json.dumps(payload),
1759 headers={'content-type': 'application/json'})
1760 self.assert_success(r)
1761 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1762 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1763 print(revsets)
1764 self.assertEquals(revsets, [{
1765 u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1766 u'ttl': 3600,
1767 u'type': u'PTR',
1768 u'comments': [],
1769 u'records': [{
1770 u'content': name,
1771 u'disabled': False,
1772 }],
1773 }])
1774 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1775 self.assertGreater(r['serial'], revzonedata['serial'])
1776
1777 def test_search_rr_exact_zone(self):
1778 name = unique_zone_name()
1779 self.create_zone(name=name, serial=22, soa_edit_api='')
1780 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
1781 self.assert_success_json(r)
1782 print(r.json())
1783 self.assertEquals(r.json(), [
1784 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1785 {u'content': u'ns1.example.com.',
1786 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1787 u'ttl': 3600, u'type': u'NS', u'name': name},
1788 {u'content': u'ns2.example.com.',
1789 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1790 u'ttl': 3600, u'type': u'NS', u'name': name},
1791 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1792 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1793 u'ttl': 3600, u'type': u'SOA', u'name': name},
1794 ])
1795
1796 def test_search_rr_exact_zone_filter_type_zone(self):
1797 name = unique_zone_name()
1798 data_type = "zone"
1799 self.create_zone(name=name, serial=22, soa_edit_api='')
1800 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
1801 self.assert_success_json(r)
1802 print(r.json())
1803 self.assertEquals(r.json(), [
1804 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1805 ])
1806
1807 def test_search_rr_exact_zone_filter_type_record(self):
1808 name = unique_zone_name()
1809 data_type = "record"
1810 self.create_zone(name=name, serial=22, soa_edit_api='')
1811 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
1812 self.assert_success_json(r)
1813 print(r.json())
1814 self.assertEquals(r.json(), [
1815 {u'content': u'ns1.example.com.',
1816 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1817 u'ttl': 3600, u'type': u'NS', u'name': name},
1818 {u'content': u'ns2.example.com.',
1819 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1820 u'ttl': 3600, u'type': u'NS', u'name': name},
1821 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1822 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1823 u'ttl': 3600, u'type': u'SOA', u'name': name},
1824 ])
1825
1826 def test_search_rr_substring(self):
1827 name = unique_zone_name()
1828 search = name[5:-5]
1829 self.create_zone(name=name)
1830 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1831 self.assert_success_json(r)
1832 print(r.json())
1833 # should return zone, SOA, ns1, ns2
1834 self.assertEquals(len(r.json()), 4)
1835
1836 def test_search_rr_case_insensitive(self):
1837 name = unique_zone_name()+'testsuffix.'
1838 self.create_zone(name=name)
1839 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1840 self.assert_success_json(r)
1841 print(r.json())
1842 # should return zone, SOA, ns1, ns2
1843 self.assertEquals(len(r.json()), 4)
1844
1845 def test_search_after_rectify_with_ent(self):
1846 name = unique_zone_name()
1847 search = name.split('.')[0]
1848 rrset = {
1849 "name": 'sub.sub.' + name,
1850 "type": "A",
1851 "ttl": 3600,
1852 "records": [{
1853 "content": "4.3.2.1",
1854 "disabled": False,
1855 }],
1856 }
1857 self.create_zone(name=name, rrsets=[rrset])
1858 pdnsutil_rectify(name)
1859 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1860 self.assert_success_json(r)
1861 print(r.json())
1862 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1863 self.assertEquals(len(r.json()), 5)
1864
1865 def test_default_api_rectify(self):
1866 name = unique_zone_name()
1867 search = name.split('.')[0]
1868 rrsets = [
1869 {
1870 "name": 'a.' + name,
1871 "type": "AAAA",
1872 "ttl": 3600,
1873 "records": [{
1874 "content": "2001:DB8::1",
1875 "disabled": False,
1876 }],
1877 },
1878 {
1879 "name": 'b.' + name,
1880 "type": "AAAA",
1881 "ttl": 3600,
1882 "records": [{
1883 "content": "2001:DB8::2",
1884 "disabled": False,
1885 }],
1886 },
1887 ]
1888 self.create_zone(name=name, rrsets=rrsets, dnssec=True, nsec3param='1 0 1 ab')
1889 dbrecs = get_db_records(name, 'AAAA')
1890 self.assertIsNotNone(dbrecs[0]['ordername'])
1891
1892 def test_override_api_rectify(self):
1893 name = unique_zone_name()
1894 search = name.split('.')[0]
1895 rrsets = [
1896 {
1897 "name": 'a.' + name,
1898 "type": "AAAA",
1899 "ttl": 3600,
1900 "records": [{
1901 "content": "2001:DB8::1",
1902 "disabled": False,
1903 }],
1904 },
1905 {
1906 "name": 'b.' + name,
1907 "type": "AAAA",
1908 "ttl": 3600,
1909 "records": [{
1910 "content": "2001:DB8::2",
1911 "disabled": False,
1912 }],
1913 },
1914 ]
1915 self.create_zone(name=name, rrsets=rrsets, api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
1916 dbrecs = get_db_records(name, 'AAAA')
1917 self.assertIsNone(dbrecs[0]['ordername'])
1918
1919 def test_cname_at_ent_place(self):
1920 name, payload, zone = self.create_zone(dnssec=True, api_rectify=True)
1921 rrset = {
1922 'changetype': 'replace',
1923 'name': 'sub2.sub1.' + name,
1924 'type': "A",
1925 'ttl': 3600,
1926 'records': [{
1927 'content': "4.3.2.1",
1928 'disabled': False,
1929 }],
1930 }
1931 payload = {'rrsets': [rrset]}
1932 r = self.session.patch(
1933 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1934 data=json.dumps(payload),
1935 headers={'content-type': 'application/json'})
1936 self.assertEquals(r.status_code, 204)
1937 rrset = {
1938 'changetype': 'replace',
1939 'name': 'sub1.' + name,
1940 'type': "CNAME",
1941 'ttl': 3600,
1942 'records': [{
1943 'content': "www.example.org.",
1944 'disabled': False,
1945 }],
1946 }
1947 payload = {'rrsets': [rrset]}
1948 r = self.session.patch(
1949 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1950 data=json.dumps(payload),
1951 headers={'content-type': 'application/json'})
1952 self.assertEquals(r.status_code, 204)
1953
1954 def test_rrset_parameter_post_false(self):
1955 name = unique_zone_name()
1956 payload = {
1957 'name': name,
1958 'kind': 'Native',
1959 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1960 }
1961 r = self.session.post(
1962 self.url("/api/v1/servers/localhost/zones?rrsets=false"),
1963 data=json.dumps(payload),
1964 headers={'content-type': 'application/json'})
1965 print(r.json())
1966 self.assert_success_json(r)
1967 self.assertEquals(r.status_code, 201)
1968 self.assertEquals(r.json().get('rrsets'), None)
1969
1970 def test_rrset_false_parameter(self):
1971 name = unique_zone_name()
1972 self.create_zone(name=name, kind='Native')
1973 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
1974 self.assert_success_json(r)
1975 print(r.json())
1976 self.assertEquals(r.json().get('rrsets'), None)
1977
1978 def test_rrset_true_parameter(self):
1979 name = unique_zone_name()
1980 self.create_zone(name=name, kind='Native')
1981 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
1982 self.assert_success_json(r)
1983 print(r.json())
1984 self.assertEquals(len(r.json().get('rrsets')), 2)
1985
1986 def test_wrong_rrset_parameter(self):
1987 name = unique_zone_name()
1988 self.create_zone(name=name, kind='Native')
1989 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar"))
1990 self.assertEquals(r.status_code, 422)
1991 self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error'])
1992
1993 def test_put_master_tsig_key_ids_non_existent(self):
1994 name = unique_zone_name()
1995 keyname = unique_zone_name().split('.')[0]
1996 self.create_zone(name=name, kind='Native')
1997 payload = {
1998 'master_tsig_key_ids': [keyname]
1999 }
2000 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
2001 data=json.dumps(payload),
2002 headers={'content-type': 'application/json'})
2003 self.assertEquals(r.status_code, 422)
2004 self.assertIn('A TSIG key with the name', r.json()['error'])
2005
2006 def test_put_slave_tsig_key_ids_non_existent(self):
2007 name = unique_zone_name()
2008 keyname = unique_zone_name().split('.')[0]
2009 self.create_zone(name=name, kind='Native')
2010 payload = {
2011 'slave_tsig_key_ids': [keyname]
2012 }
2013 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
2014 data=json.dumps(payload),
2015 headers={'content-type': 'application/json'})
2016 self.assertEquals(r.status_code, 422)
2017 self.assertIn('A TSIG key with the name', r.json()['error'])
2018
2019
2020 @unittest.skipIf(not is_auth(), "Not applicable")
2021 class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
2022
2023 def setUp(self):
2024 super(AuthRootZone, self).setUp()
2025 # zone name is not unique, so delete the zone before each individual test.
2026 self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
2027
2028 def test_create_zone(self):
2029 name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
2030 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
2031 self.assertIn(k, data)
2032 if k in payload:
2033 self.assertEquals(data[k], payload[k])
2034 # validate generated SOA
2035 rec = get_first_rec(data, '.', 'SOA')
2036 self.assertEquals(
2037 rec['content'],
2038 "a.misconfigured.powerdns.server. hostmaster. " + str(payload['serial']) +
2039 " 10800 3600 604800 3600"
2040 )
2041 # Regression test: verify zone list works
2042 zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
2043 print("zonelist:", zonelist)
2044 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
2045 # Also test that fetching the zone works.
2046 print("id:", data['id'])
2047 self.assertEquals(data['id'], '=2E')
2048 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
2049 print("zone (fetched):", data)
2050 for k in ('name', 'kind'):
2051 self.assertIn(k, data)
2052 self.assertEquals(data[k], payload[k])
2053 self.assertEqual(data['rrsets'][0]['name'], '.')
2054
2055 def test_update_zone(self):
2056 name, payload, zone = self.create_zone(name='.')
2057 zone_id = '=2E'
2058 # update, set as Master and enable SOA-EDIT-API
2059 payload = {
2060 'kind': 'Master',
2061 'masters': ['192.0.2.1', '192.0.2.2'],
2062 'soa_edit_api': 'EPOCH',
2063 'soa_edit': 'EPOCH'
2064 }
2065 r = self.session.put(
2066 self.url("/api/v1/servers/localhost/zones/" + zone_id),
2067 data=json.dumps(payload),
2068 headers={'content-type': 'application/json'})
2069 self.assert_success(r)
2070 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
2071 for k in payload.keys():
2072 self.assertIn(k, data)
2073 self.assertEquals(data[k], payload[k])
2074 # update, back to Native and empty(off)
2075 payload = {
2076 'kind': 'Native',
2077 'soa_edit_api': '',
2078 'soa_edit': ''
2079 }
2080 r = self.session.put(
2081 self.url("/api/v1/servers/localhost/zones/" + zone_id),
2082 data=json.dumps(payload),
2083 headers={'content-type': 'application/json'})
2084 self.assert_success(r)
2085 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
2086 for k in payload.keys():
2087 self.assertIn(k, data)
2088 self.assertEquals(data[k], payload[k])
2089
2090
2091 @unittest.skipIf(not is_recursor(), "Not applicable")
2092 class RecursorZones(ApiTestCase):
2093
2094 def create_zone(self, name=None, kind=None, rd=False, servers=None):
2095 if name is None:
2096 name = unique_zone_name()
2097 if servers is None:
2098 servers = []
2099 payload = {
2100 'name': name,
2101 'kind': kind,
2102 'servers': servers,
2103 'recursion_desired': rd
2104 }
2105 r = self.session.post(
2106 self.url("/api/v1/servers/localhost/zones"),
2107 data=json.dumps(payload),
2108 headers={'content-type': 'application/json'})
2109 self.assert_success_json(r)
2110 return payload, r.json()
2111
2112 def test_create_auth_zone(self):
2113 payload, data = self.create_zone(kind='Native')
2114 for k in payload.keys():
2115 self.assertEquals(data[k], payload[k])
2116
2117 def test_create_zone_no_name(self):
2118 payload = {
2119 'name': '',
2120 'kind': 'Native',
2121 'servers': ['8.8.8.8'],
2122 'recursion_desired': False,
2123 }
2124 print(payload)
2125 r = self.session.post(
2126 self.url("/api/v1/servers/localhost/zones"),
2127 data=json.dumps(payload),
2128 headers={'content-type': 'application/json'})
2129 self.assertEquals(r.status_code, 422)
2130 self.assertIn('is not canonical', r.json()['error'])
2131
2132 def test_create_forwarded_zone(self):
2133 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
2134 # return values are normalized
2135 payload['servers'][0] += ':53'
2136 for k in payload.keys():
2137 self.assertEquals(data[k], payload[k])
2138
2139 def test_create_forwarded_rd_zone(self):
2140 payload, data = self.create_zone(name='google.com.', kind='Forwarded', rd=True, servers=['8.8.8.8'])
2141 # return values are normalized
2142 payload['servers'][0] += ':53'
2143 for k in payload.keys():
2144 self.assertEquals(data[k], payload[k])
2145
2146 def test_create_auth_zone_with_symbols(self):
2147 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
2148 expected_id = (payload['name'].replace('/', '=2F'))
2149 for k in payload.keys():
2150 self.assertEquals(data[k], payload[k])
2151 self.assertEquals(data['id'], expected_id)
2152
2153 def test_rename_auth_zone(self):
2154 payload, data = self.create_zone(kind='Native')
2155 name = payload['name']
2156 # now rename it
2157 payload = {
2158 'name': 'renamed-'+name,
2159 'kind': 'Native',
2160 'recursion_desired': False
2161 }
2162 r = self.session.put(
2163 self.url("/api/v1/servers/localhost/zones/" + name),
2164 data=json.dumps(payload),
2165 headers={'content-type': 'application/json'})
2166 self.assert_success(r)
2167 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + payload['name'])).json()
2168 for k in payload.keys():
2169 self.assertEquals(data[k], payload[k])
2170
2171 def test_zone_delete(self):
2172 payload, zone = self.create_zone(kind='Native')
2173 name = payload['name']
2174 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
2175 self.assertEquals(r.status_code, 204)
2176 self.assertNotIn('Content-Type', r.headers)
2177
2178 def test_search_rr_exact_zone(self):
2179 name = unique_zone_name()
2180 self.create_zone(name=name, kind='Native')
2181 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
2182 self.assert_success_json(r)
2183 print(r.json())
2184 self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
2185
2186 def test_search_rr_substring(self):
2187 name = 'search-rr-zone.name.'
2188 self.create_zone(name=name, kind='Native')
2189 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2190 self.assert_success_json(r)
2191 print(r.json())
2192 # should return zone, SOA
2193 self.assertEquals(len(r.json()), 2)
2194
2195 @unittest.skipIf(not is_auth(), "Not applicable")
2196 class AuthZoneKeys(ApiTestCase, AuthZonesHelperMixin):
2197
2198 def test_get_keys(self):
2199 r = self.session.get(
2200 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2201 self.assert_success_json(r)
2202 keys = r.json()
2203 self.assertGreater(len(keys), 0)
2204
2205 key0 = deepcopy(keys[0])
2206 del key0['dnskey']
2207 del key0['ds']
2208 expected = {
2209 u'algorithm': u'ECDSAP256SHA256',
2210 u'bits': 256,
2211 u'active': True,
2212 u'type': u'Cryptokey',
2213 u'keytype': u'csk',
2214 u'flags': 257,
2215 u'published': True,
2216 u'id': 1}
2217 self.assertEquals(key0, expected)
2218
2219 keydata = keys[0]['dnskey'].split()
2220 self.assertEqual(len(keydata), 4)