]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Zones.py
Merge pull request #7537 from andreydomas/dnsnameset
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
1 from __future__ import print_function
2 import json
3 import time
4 import unittest
5 from copy import deepcopy
6 from parameterized import parameterized
7 from pprint import pprint
8 from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor, get_db_records, pdnsutil_rectify
9
10
11 def get_rrset(data, qname, qtype):
12 for rrset in data['rrsets']:
13 if rrset['name'] == qname and rrset['type'] == qtype:
14 return rrset
15 return None
16
17
18 def get_first_rec(data, qname, qtype):
19 rrset = get_rrset(data, qname, qtype)
20 if rrset:
21 return rrset['records'][0]
22 return None
23
24
25 def eq_zone_rrsets(rrsets, expected):
26 data_got = {}
27 data_expected = {}
28 for type_, expected_records in expected.items():
29 type_ = str(type_)
30 data_got[type_] = set()
31 data_expected[type_] = set()
32 uses_name = any(['name' in expected_record for expected_record in expected_records])
33 # minify + convert received data
34 for rrset in [rrset for rrset in rrsets if rrset['type'] == type_]:
35 print(rrset)
36 for r in rrset['records']:
37 data_got[type_].add((rrset['name'] if uses_name else '@', rrset['type'], r['content']))
38 # minify expected data
39 for r in expected_records:
40 data_expected[type_].add((r['name'] if uses_name else '@', type_, r['content']))
41
42 print("eq_zone_rrsets: got:")
43 pprint(data_got)
44 print("eq_zone_rrsets: expected:")
45 pprint(data_expected)
46
47 assert data_got == data_expected, "%r != %r" % (data_got, data_expected)
48
49
50 class Zones(ApiTestCase):
51
52 def test_list_zones(self):
53 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
54 self.assert_success_json(r)
55 domains = r.json()
56 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
57 self.assertEquals(len(example_com), 1)
58 example_com = example_com[0]
59 print(example_com)
60 required_fields = ['id', 'url', 'name', 'kind']
61 if is_auth():
62 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial', 'account']
63 self.assertNotEquals(example_com['serial'], 0)
64 elif is_recursor():
65 required_fields = required_fields + ['recursion_desired', 'servers']
66 for field in required_fields:
67 self.assertIn(field, example_com)
68
69
70 class AuthZonesHelperMixin(object):
71 def create_zone(self, name=None, **kwargs):
72 if name is None:
73 name = unique_zone_name()
74 payload = {
75 'name': name,
76 'kind': 'Native',
77 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
78 }
79 for k, v in kwargs.items():
80 if v is None:
81 del payload[k]
82 else:
83 payload[k] = v
84 print("sending", payload)
85 r = self.session.post(
86 self.url("/api/v1/servers/localhost/zones"),
87 data=json.dumps(payload),
88 headers={'content-type': 'application/json'})
89 self.assert_success_json(r)
90 self.assertEquals(r.status_code, 201)
91 reply = r.json()
92 print("reply", reply)
93 return name, payload, reply
94
95
96 @unittest.skipIf(not is_auth(), "Not applicable")
97 class AuthZones(ApiTestCase, AuthZonesHelperMixin):
98
99 def test_create_zone(self):
100 # soa_edit_api has a default, override with empty for this test
101 name, payload, data = self.create_zone(serial=22, soa_edit_api='')
102 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
103 self.assertIn(k, data)
104 if k in payload:
105 self.assertEquals(data[k], payload[k])
106 # validate generated SOA
107 expected_soa = "a.misconfigured.powerdns.server. hostmaster." + name + " " + \
108 str(payload['serial']) + " 10800 3600 604800 3600"
109 self.assertEquals(
110 get_first_rec(data, name, 'SOA')['content'],
111 expected_soa
112 )
113 # Because we had confusion about dots, check that the DB is without dots.
114 dbrecs = get_db_records(name, 'SOA')
115 self.assertEqual(dbrecs[0]['content'], expected_soa.replace('. ', ' '))
116
117 def test_create_zone_with_soa_edit_api(self):
118 # soa_edit_api wins over serial
119 name, payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
120 for k in ('soa_edit_api', ):
121 self.assertIn(k, data)
122 if k in payload:
123 self.assertEquals(data[k], payload[k])
124 # generated EPOCH serial surely is > fixed serial we passed in
125 print(data)
126 self.assertGreater(data['serial'], payload['serial'])
127 soa_serial = int(get_first_rec(data, name, 'SOA')['content'].split(' ')[2])
128 self.assertGreater(soa_serial, payload['serial'])
129 self.assertEquals(soa_serial, data['serial'])
130
131 def test_create_zone_with_account(self):
132 # soa_edit_api wins over serial
133 name, payload, data = self.create_zone(account='anaccount', serial=10)
134 print(data)
135 for k in ('account', ):
136 self.assertIn(k, data)
137 if k in payload:
138 self.assertEquals(data[k], payload[k])
139
140 def test_create_zone_default_soa_edit_api(self):
141 name, payload, data = self.create_zone()
142 print(data)
143 self.assertEquals(data['soa_edit_api'], 'DEFAULT')
144
145 def test_create_zone_exists(self):
146 name, payload, data = self.create_zone()
147 print(data)
148 payload = {
149 'name': name,
150 'kind': 'Native'
151 }
152 print(payload)
153 r = self.session.post(
154 self.url("/api/v1/servers/localhost/zones"),
155 data=json.dumps(payload),
156 headers={'content-type': 'application/json'})
157 self.assertEquals(r.status_code, 409) # Conflict - already exists
158
159 def test_create_zone_with_soa_edit(self):
160 name, payload, data = self.create_zone(soa_edit='INCEPTION-INCREMENT', soa_edit_api='SOA-EDIT-INCREASE')
161 print(data)
162 self.assertEquals(data['soa_edit'], 'INCEPTION-INCREMENT')
163 self.assertEquals(data['soa_edit_api'], 'SOA-EDIT-INCREASE')
164 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
165 # These particular settings lead to the first serial set to YYYYMMDD01.
166 self.assertEquals(soa_serial[-2:], '01')
167 rrset = {
168 'changetype': 'replace',
169 'name': name,
170 'type': 'A',
171 'ttl': 3600,
172 'records': [
173 {
174 "content": "127.0.0.1",
175 "disabled": False
176 }
177 ]
178 }
179 payload = {'rrsets': [rrset]}
180 self.session.patch(
181 self.url("/api/v1/servers/localhost/zones/" + data['id']),
182 data=json.dumps(payload),
183 headers={'content-type': 'application/json'})
184 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
185 data = r.json()
186 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
187 self.assertEquals(soa_serial[-2:], '02')
188
189 def test_create_zone_with_records(self):
190 name = unique_zone_name()
191 rrset = {
192 "name": name,
193 "type": "A",
194 "ttl": 3600,
195 "records": [{
196 "content": "4.3.2.1",
197 "disabled": False,
198 }],
199 }
200 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
201 # check our record has appeared
202 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
203
204 def test_create_zone_with_wildcard_records(self):
205 name = unique_zone_name()
206 rrset = {
207 "name": "*."+name,
208 "type": "A",
209 "ttl": 3600,
210 "records": [{
211 "content": "4.3.2.1",
212 "disabled": False,
213 }],
214 }
215 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
216 # check our record has appeared
217 self.assertEquals(get_rrset(data, rrset['name'], 'A')['records'], rrset['records'])
218
219 def test_create_zone_with_comments(self):
220 name = unique_zone_name()
221 rrsets = [
222 {
223 "name": name,
224 "type": "soa", # test uppercasing of type, too.
225 "comments": [{
226 "account": "test1",
227 "content": "blah blah",
228 "modified_at": 11112,
229 }],
230 },
231 {
232 "name": name,
233 "type": "AAAA",
234 "ttl": 3600,
235 "records": [{
236 "content": "2001:DB8::1",
237 "disabled": False,
238 }],
239 "comments": [{
240 "account": "test AAAA",
241 "content": "blah blah AAAA",
242 "modified_at": 11112,
243 }],
244 },
245 {
246 "name": name,
247 "type": "TXT",
248 "ttl": 3600,
249 "records": [{
250 "content": "\"test TXT\"",
251 "disabled": False,
252 }],
253 },
254 {
255 "name": name,
256 "type": "A",
257 "ttl": 3600,
258 "records": [{
259 "content": "192.0.2.1",
260 "disabled": False,
261 }],
262 },
263 ]
264 name, payload, data = self.create_zone(name=name, rrsets=rrsets)
265 # NS records have been created
266 self.assertEquals(len(data['rrsets']), len(rrsets) + 1)
267 # check our comment has appeared
268 self.assertEquals(get_rrset(data, name, 'SOA')['comments'], rrsets[0]['comments'])
269 self.assertEquals(get_rrset(data, name, 'A')['comments'], [])
270 self.assertEquals(get_rrset(data, name, 'TXT')['comments'], [])
271 self.assertEquals(get_rrset(data, name, 'AAAA')['comments'], rrsets[1]['comments'])
272
273 def test_create_zone_uncanonical_nameservers(self):
274 name = unique_zone_name()
275 payload = {
276 'name': name,
277 'kind': 'Native',
278 'nameservers': ['uncanon.example.com']
279 }
280 print(payload)
281 r = self.session.post(
282 self.url("/api/v1/servers/localhost/zones"),
283 data=json.dumps(payload),
284 headers={'content-type': 'application/json'})
285 self.assertEquals(r.status_code, 422)
286 self.assertIn('Nameserver is not canonical', r.json()['error'])
287
288 def test_create_auth_zone_no_name(self):
289 name = unique_zone_name()
290 payload = {
291 'name': '',
292 'kind': 'Native',
293 }
294 print(payload)
295 r = self.session.post(
296 self.url("/api/v1/servers/localhost/zones"),
297 data=json.dumps(payload),
298 headers={'content-type': 'application/json'})
299 self.assertEquals(r.status_code, 422)
300 self.assertIn('is not canonical', r.json()['error'])
301
302 def test_create_zone_with_custom_soa(self):
303 name = unique_zone_name()
304 content = u"ns1.example.net. testmaster@example.net. 10 10800 3600 604800 3600"
305 rrset = {
306 "name": name,
307 "type": "soa", # test uppercasing of type, too.
308 "ttl": 3600,
309 "records": [{
310 "content": content,
311 "disabled": False,
312 }],
313 }
314 name, payload, data = self.create_zone(name=name, rrsets=[rrset], soa_edit_api='')
315 self.assertEquals(get_rrset(data, name, 'SOA')['records'], rrset['records'])
316 dbrecs = get_db_records(name, 'SOA')
317 self.assertEqual(dbrecs[0]['content'], content.replace('. ', ' '))
318
319 def test_create_zone_double_dot(self):
320 name = 'test..' + unique_zone_name()
321 payload = {
322 'name': name,
323 'kind': 'Native',
324 'nameservers': ['ns1.example.com.']
325 }
326 print(payload)
327 r = self.session.post(
328 self.url("/api/v1/servers/localhost/zones"),
329 data=json.dumps(payload),
330 headers={'content-type': 'application/json'})
331 self.assertEquals(r.status_code, 422)
332 self.assertIn('Unable to parse DNS Name', r.json()['error'])
333
334 def test_create_zone_restricted_chars(self):
335 name = 'test:' + unique_zone_name() # : isn't good as a name.
336 payload = {
337 'name': name,
338 'kind': 'Native',
339 'nameservers': ['ns1.example.com']
340 }
341 print(payload)
342 r = self.session.post(
343 self.url("/api/v1/servers/localhost/zones"),
344 data=json.dumps(payload),
345 headers={'content-type': 'application/json'})
346 self.assertEquals(r.status_code, 422)
347 self.assertIn('contains unsupported characters', r.json()['error'])
348
349 def test_create_zone_mixed_nameservers_ns_rrset_zonelevel(self):
350 name = unique_zone_name()
351 rrset = {
352 "name": name,
353 "type": "NS",
354 "ttl": 3600,
355 "records": [{
356 "content": "ns2.example.com.",
357 "disabled": False,
358 }],
359 }
360 payload = {
361 'name': name,
362 'kind': 'Native',
363 'nameservers': ['ns1.example.com.'],
364 'rrsets': [rrset],
365 }
366 print(payload)
367 r = self.session.post(
368 self.url("/api/v1/servers/localhost/zones"),
369 data=json.dumps(payload),
370 headers={'content-type': 'application/json'})
371 self.assertEquals(r.status_code, 422)
372 self.assertIn('Nameservers list MUST NOT be mixed with zone-level NS in rrsets', r.json()['error'])
373
374 def test_create_zone_mixed_nameservers_ns_rrset_below_zonelevel(self):
375 name = unique_zone_name()
376 rrset = {
377 "name": 'subzone.'+name,
378 "type": "NS",
379 "ttl": 3600,
380 "records": [{
381 "content": "ns2.example.com.",
382 "disabled": False,
383 }],
384 }
385 payload = {
386 'name': name,
387 'kind': 'Native',
388 'nameservers': ['ns1.example.com.'],
389 'rrsets': [rrset],
390 }
391 print(payload)
392 r = self.session.post(
393 self.url("/api/v1/servers/localhost/zones"),
394 data=json.dumps(payload),
395 headers={'content-type': 'application/json'})
396 self.assert_success_json(r)
397
398 def test_create_zone_with_symbols(self):
399 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
400 name = payload['name']
401 expected_id = name.replace('/', '=2F')
402 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
403 self.assertIn(k, data)
404 if k in payload:
405 self.assertEquals(data[k], payload[k])
406 self.assertEquals(data['id'], expected_id)
407 dbrecs = get_db_records(name, 'SOA')
408 self.assertEqual(dbrecs[0]['name'], name.rstrip('.'))
409
410 def test_create_zone_with_nameservers_non_string(self):
411 # ensure we don't crash
412 name = unique_zone_name()
413 payload = {
414 'name': name,
415 'kind': 'Native',
416 'nameservers': [{'a': 'ns1.example.com'}] # invalid
417 }
418 print(payload)
419 r = self.session.post(
420 self.url("/api/v1/servers/localhost/zones"),
421 data=json.dumps(payload),
422 headers={'content-type': 'application/json'})
423 self.assertEquals(r.status_code, 422)
424
425 def test_create_zone_with_dnssec(self):
426 """
427 Create a zone with "dnssec" set and see if a key was made.
428 """
429 name = unique_zone_name()
430 name, payload, data = self.create_zone(dnssec=True)
431
432 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
433
434 for k in ('dnssec', ):
435 self.assertIn(k, data)
436 if k in payload:
437 self.assertEquals(data[k], payload[k])
438
439 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
440
441 keys = r.json()
442
443 print(keys)
444
445 self.assertEquals(r.status_code, 200)
446 self.assertEquals(len(keys), 1)
447 self.assertEquals(keys[0]['type'], 'Cryptokey')
448 self.assertEquals(keys[0]['active'], True)
449 self.assertEquals(keys[0]['keytype'], 'csk')
450
451 def test_create_zone_with_dnssec_disable_dnssec(self):
452 """
453 Create a zone with "dnssec", then set "dnssec" to false and see if the
454 keys are gone
455 """
456 name = unique_zone_name()
457 name, payload, data = self.create_zone(dnssec=True)
458
459 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
460 data=json.dumps({'dnssec': False}))
461 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
462
463 zoneinfo = r.json()
464
465 self.assertEquals(r.status_code, 200)
466 self.assertEquals(zoneinfo['dnssec'], False)
467
468 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/cryptokeys'))
469
470 keys = r.json()
471
472 self.assertEquals(r.status_code, 200)
473 self.assertEquals(len(keys), 0)
474
475 def test_create_zone_with_nsec3param(self):
476 """
477 Create a zone with "nsec3param" set and see if the metadata was added.
478 """
479 name = unique_zone_name()
480 nsec3param = '1 0 500 aabbccddeeff'
481 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param)
482
483 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
484
485 for k in ('dnssec', 'nsec3param'):
486 self.assertIn(k, data)
487 if k in payload:
488 self.assertEquals(data[k], payload[k])
489
490 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3PARAM'))
491
492 data = r.json()
493
494 print(data)
495
496 self.assertEquals(r.status_code, 200)
497 self.assertEquals(len(data['metadata']), 1)
498 self.assertEquals(data['kind'], 'NSEC3PARAM')
499 self.assertEquals(data['metadata'][0], nsec3param)
500
501 def test_create_zone_with_nsec3narrow(self):
502 """
503 Create a zone with "nsec3narrow" set and see if the metadata was added.
504 """
505 name = unique_zone_name()
506 nsec3param = '1 0 500 aabbccddeeff'
507 name, payload, data = self.create_zone(dnssec=True, nsec3param=nsec3param,
508 nsec3narrow=True)
509
510 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
511
512 for k in ('dnssec', 'nsec3param', 'nsec3narrow'):
513 self.assertIn(k, data)
514 if k in payload:
515 self.assertEquals(data[k], payload[k])
516
517 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name + '/metadata/NSEC3NARROW'))
518
519 data = r.json()
520
521 print(data)
522
523 self.assertEquals(r.status_code, 200)
524 self.assertEquals(len(data['metadata']), 1)
525 self.assertEquals(data['kind'], 'NSEC3NARROW')
526 self.assertEquals(data['metadata'][0], '1')
527
528 def test_create_zone_dnssec_serial(self):
529 """
530 Create a zone set/unset "dnssec" and see if the serial was increased
531 after every step
532 """
533 name = unique_zone_name()
534 name, payload, data = self.create_zone()
535
536 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
537 self.assertEquals(soa_serial[-2:], '01')
538
539 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
540 data=json.dumps({'dnssec': True}))
541 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
542
543 data = r.json()
544 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
545
546 self.assertEquals(r.status_code, 200)
547 self.assertEquals(soa_serial[-2:], '02')
548
549 self.session.put(self.url("/api/v1/servers/localhost/zones/" + name),
550 data=json.dumps({'dnssec': False}))
551 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name))
552
553 data = r.json()
554 soa_serial = get_first_rec(data, name, 'SOA')['content'].split(' ')[2]
555
556 self.assertEquals(r.status_code, 200)
557 self.assertEquals(soa_serial[-2:], '03')
558
559 def test_zone_absolute_url(self):
560 name, payload, data = self.create_zone()
561 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
562 rdata = r.json()
563 print(rdata[0])
564 self.assertTrue(rdata[0]['url'].startswith('/api/v'))
565
566 def test_create_zone_metadata(self):
567 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
568 r = self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
569 data=json.dumps(payload_metadata))
570 rdata = r.json()
571 self.assertEquals(r.status_code, 201)
572 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
573
574 def test_create_zone_metadata_kind(self):
575 payload_metadata = {"metadata": ["127.0.0.2"]}
576 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"),
577 data=json.dumps(payload_metadata))
578 rdata = r.json()
579 self.assertEquals(r.status_code, 200)
580 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
581
582 def test_create_protected_zone_metadata(self):
583 # test whether it prevents modification of certain kinds
584 for k in ("NSEC3NARROW", "NSEC3PARAM", "PRESIGNED", "LUA-AXFR-SCRIPT"):
585 payload = {"metadata": ["FOO", "BAR"]}
586 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/%s" % k),
587 data=json.dumps(payload))
588 self.assertEquals(r.status_code, 422)
589
590 def test_retrieve_zone_metadata(self):
591 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
592 self.session.post(self.url("/api/v1/servers/localhost/zones/example.com/metadata"),
593 data=json.dumps(payload_metadata))
594 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata"))
595 rdata = r.json()
596 self.assertEquals(r.status_code, 200)
597 self.assertIn(payload_metadata, rdata)
598
599 def test_delete_zone_metadata(self):
600 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
601 self.assertEquals(r.status_code, 200)
602 r = self.session.get(self.url("/api/v1/servers/localhost/zones/example.com/metadata/AXFR-SOURCE"))
603 rdata = r.json()
604 self.assertEquals(r.status_code, 200)
605 self.assertEquals(rdata["metadata"], [])
606
607 def test_create_external_zone_metadata(self):
608 payload_metadata = {"metadata": ["My very important message"]}
609 r = self.session.put(self.url("/api/v1/servers/localhost/zones/example.com/metadata/X-MYMETA"),
610 data=json.dumps(payload_metadata))
611 self.assertEquals(r.status_code, 200)
612 rdata = r.json()
613 self.assertEquals(rdata["metadata"], payload_metadata["metadata"])
614
615 def test_create_metadata_in_non_existent_zone(self):
616 payload_metadata = {"type": "Metadata", "kind": "AXFR-SOURCE", "metadata": ["127.0.0.2"]}
617 r = self.session.post(self.url("/api/v1/servers/localhost/zones/idonotexist.123.456.example./metadata"),
618 data=json.dumps(payload_metadata))
619 self.assertEquals(r.status_code, 404)
620 # Note: errors should probably contain json (see #5988)
621 # self.assertIn('Could not find domain ', r.json()['error'])
622
623 def test_create_slave_zone(self):
624 # Test that nameservers can be absent for slave zones.
625 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
626 for k in ('name', 'masters', 'kind'):
627 self.assertIn(k, data)
628 self.assertEquals(data[k], payload[k])
629 print("payload:", payload)
630 print("data:", data)
631 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
632 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
633 zonelist = r.json()
634 print("zonelist:", zonelist)
635 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
636 # Also test that fetching the zone works.
637 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id']))
638 data = r.json()
639 print("zone (fetched):", data)
640 for k in ('name', 'masters', 'kind'):
641 self.assertIn(k, data)
642 self.assertEquals(data[k], payload[k])
643 self.assertEqual(data['serial'], 0)
644 self.assertEqual(data['rrsets'], [])
645
646 def test_find_zone_by_name(self):
647 name = 'foo/' + unique_zone_name()
648 name, payload, data = self.create_zone(name=name)
649 r = self.session.get(self.url("/api/v1/servers/localhost/zones?zone=" + name))
650 data = r.json()
651 print(data)
652 self.assertEquals(data[0]['name'], name)
653
654 def test_delete_slave_zone(self):
655 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
656 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + data['id']))
657 r.raise_for_status()
658
659 def test_retrieve_slave_zone(self):
660 name, payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
661 print("payload:", payload)
662 print("data:", data)
663 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/axfr-retrieve"))
664 data = r.json()
665 print("status for axfr-retrieve:", data)
666 self.assertEqual(data['result'], u'Added retrieval request for \'' + payload['name'] +
667 '\' from master 127.0.0.2')
668
669 def test_notify_master_zone(self):
670 name, payload, data = self.create_zone(kind='Master')
671 print("payload:", payload)
672 print("data:", data)
673 r = self.session.put(self.url("/api/v1/servers/localhost/zones/" + data['id'] + "/notify"))
674 data = r.json()
675 print("status for notify:", data)
676 self.assertEqual(data['result'], 'Notification queued')
677
678 def test_get_zone_with_symbols(self):
679 name, payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
680 name = payload['name']
681 zone_id = (name.replace('/', '=2F'))
682 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id))
683 data = r.json()
684 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
685 self.assertIn(k, data)
686 if k in payload:
687 self.assertEquals(data[k], payload[k])
688
689 def test_get_zone(self):
690 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
691 domains = r.json()
692 example_com = [domain for domain in domains if domain['name'] == u'example.com.'][0]
693 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + example_com['id']))
694 self.assert_success_json(r)
695 data = r.json()
696 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
697 self.assertIn(k, data)
698 self.assertEquals(data['name'], 'example.com.')
699
700 def test_import_zone_broken(self):
701 payload = {
702 'name': 'powerdns-broken.com',
703 'kind': 'Master',
704 'nameservers': [],
705 }
706 payload['zone'] = """
707 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
708 flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
709 ;; WARNING: recursion requested but not available
710
711 ;; OPT PSEUDOSECTION:
712 ; EDNS: version: 0, flags:; udp: 1680
713 ;; QUESTION SECTION:
714 ;powerdns.com. IN SOA
715
716 ;; ANSWER SECTION:
717 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
718 powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
719 powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
720 powerdns-broken.com. 86400 IN A 82.94.213.34
721 powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
722 powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
723 powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
724 """
725 r = self.session.post(
726 self.url("/api/v1/servers/localhost/zones"),
727 data=json.dumps(payload),
728 headers={'content-type': 'application/json'})
729 self.assertEquals(r.status_code, 422)
730
731 def test_import_zone_axfr_outofzone(self):
732 # Ensure we don't create out-of-zone records
733 payload = {
734 'name': unique_zone_name(),
735 'kind': 'Master',
736 'nameservers': [],
737 }
738 payload['zone'] = """
739 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
740 %NAME% 3600 IN NS powerdnssec2.ds9a.nl.
741 example.org. 3600 IN AAAA 2001:888:2000:1d::2
742 %NAME% 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
743 """.replace('%NAME%', payload['name'])
744 r = self.session.post(
745 self.url("/api/v1/servers/localhost/zones"),
746 data=json.dumps(payload),
747 headers={'content-type': 'application/json'})
748 self.assertEquals(r.status_code, 422)
749 self.assertEqual(r.json()['error'], 'RRset example.org. IN AAAA: Name is out of zone')
750
751 def test_import_zone_axfr(self):
752 payload = {
753 'name': 'powerdns.com.',
754 'kind': 'Master',
755 'nameservers': [],
756 'soa_edit_api': '', # turn off so exact SOA comparison works.
757 }
758 payload['zone'] = """
759 ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
760 ;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
761 ;; WARNING: recursion requested but not available
762
763 ;; OPT PSEUDOSECTION:
764 ; EDNS: version: 0, flags:; udp: 1680
765 ;; QUESTION SECTION:
766 ;powerdns.com. IN SOA
767
768 ;; ANSWER SECTION:
769 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
770 powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
771 powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
772 powerdns.com. 86400 IN A 82.94.213.34
773 powerdns.com. 3600 IN MX 0 xs.powerdns.com.
774 powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
775 powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
776 """
777 r = self.session.post(
778 self.url("/api/v1/servers/localhost/zones"),
779 data=json.dumps(payload),
780 headers={'content-type': 'application/json'})
781 self.assert_success_json(r)
782 data = r.json()
783 self.assertIn('name', data)
784
785 expected = {
786 'NS': [
787 {'content': 'powerdnssec1.ds9a.nl.'},
788 {'content': 'powerdnssec2.ds9a.nl.'},
789 ],
790 'SOA': [
791 {'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800'},
792 ],
793 'MX': [
794 {'content': '0 xs.powerdns.com.'},
795 ],
796 'A': [
797 {'content': '82.94.213.34', 'name': 'powerdns.com.'},
798 ],
799 'AAAA': [
800 {'content': '2001:888:2000:1d::2', 'name': 'powerdns.com.'},
801 ],
802 }
803
804 eq_zone_rrsets(data['rrsets'], expected)
805
806 # check content in DB is stored WITHOUT trailing dot.
807 dbrecs = get_db_records(payload['name'], 'NS')
808 dbrec = next((dbrec for dbrec in dbrecs if dbrec['content'].startswith('powerdnssec1')))
809 self.assertEqual(dbrec['content'], 'powerdnssec1.ds9a.nl')
810
811 def test_import_zone_bind(self):
812 payload = {
813 'name': 'example.org.',
814 'kind': 'Master',
815 'nameservers': [],
816 'soa_edit_api': '', # turn off so exact SOA comparison works.
817 }
818 payload['zone'] = """
819 $TTL 86400 ; 24 hours could have been written as 24h or 1d
820 ; $TTL used for all RRs without explicit TTL value
821 $ORIGIN example.org.
822 @ 1D IN SOA ns1.example.org. hostmaster.example.org. (
823 2002022401 ; serial
824 3H ; refresh
825 15 ; retry
826 1w ; expire
827 3h ; minimum
828 )
829 IN NS ns1.example.org. ; in the domain
830 IN NS ns2.smokeyjoe.com. ; external to domain
831 IN MX 10 mail.another.com. ; external mail provider
832 ; server host definitions
833 ns1 IN A 192.168.0.1 ;name server definition
834 www IN A 192.168.0.2 ;web server definition
835 ftp IN CNAME www.example.org. ;ftp server definition
836 ; non server domain hosts
837 bill IN A 192.168.0.3
838 fred IN A 192.168.0.4
839 """
840 r = self.session.post(
841 self.url("/api/v1/servers/localhost/zones"),
842 data=json.dumps(payload),
843 headers={'content-type': 'application/json'})
844 self.assert_success_json(r)
845 data = r.json()
846 self.assertIn('name', data)
847
848 expected = {
849 'NS': [
850 {'content': 'ns1.example.org.'},
851 {'content': 'ns2.smokeyjoe.com.'},
852 ],
853 'SOA': [
854 {'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800'},
855 ],
856 'MX': [
857 {'content': '10 mail.another.com.'},
858 ],
859 'A': [
860 {'content': '192.168.0.1', 'name': 'ns1.example.org.'},
861 {'content': '192.168.0.2', 'name': 'www.example.org.'},
862 {'content': '192.168.0.3', 'name': 'bill.example.org.'},
863 {'content': '192.168.0.4', 'name': 'fred.example.org.'},
864 ],
865 'CNAME': [
866 {'content': 'www.example.org.', 'name': 'ftp.example.org.'},
867 ],
868 }
869
870 eq_zone_rrsets(data['rrsets'], expected)
871
872 def test_import_zone_bind_cname_apex(self):
873 payload = {
874 'name': unique_zone_name(),
875 'kind': 'Master',
876 'nameservers': [],
877 }
878 payload['zone'] = """
879 $ORIGIN %NAME%
880 @ IN SOA ns1.example.org. hostmaster.example.org. (2002022401 3H 15 1W 3H)
881 @ IN NS ns1.example.org.
882 @ IN NS ns2.smokeyjoe.com.
883 @ IN CNAME www.example.org.
884 """.replace('%NAME%', payload['name'])
885 r = self.session.post(
886 self.url("/api/v1/servers/localhost/zones"),
887 data=json.dumps(payload),
888 headers={'content-type': 'application/json'})
889 self.assertEquals(r.status_code, 422)
890 self.assertIn('Conflicts with another RRset', r.json()['error'])
891
892 def test_export_zone_json(self):
893 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
894 # export it
895 r = self.session.get(
896 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
897 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
898 )
899 self.assert_success_json(r)
900 data = r.json()
901 self.assertIn('zone', data)
902 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
903 name + '\t3600\tIN\tNS\tns2.foo.com.',
904 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
905 ' 0 10800 3600 604800 3600']
906 self.assertEquals(data['zone'].strip().split('\n'), expected_data)
907
908 def test_export_zone_text(self):
909 name, payload, zone = self.create_zone(nameservers=['ns1.foo.com.', 'ns2.foo.com.'], soa_edit_api='')
910 # export it
911 r = self.session.get(
912 self.url("/api/v1/servers/localhost/zones/" + name + "/export"),
913 headers={'accept': '*/*'}
914 )
915 data = r.text.strip().split("\n")
916 expected_data = [name + '\t3600\tIN\tNS\tns1.foo.com.',
917 name + '\t3600\tIN\tNS\tns2.foo.com.',
918 name + '\t3600\tIN\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
919 ' 0 10800 3600 604800 3600']
920 self.assertEquals(data, expected_data)
921
922 def test_update_zone(self):
923 name, payload, zone = self.create_zone()
924 name = payload['name']
925 # update, set as Master and enable SOA-EDIT-API
926 payload = {
927 'kind': 'Master',
928 'masters': ['192.0.2.1', '192.0.2.2'],
929 'soa_edit_api': 'EPOCH',
930 'soa_edit': 'EPOCH'
931 }
932 r = self.session.put(
933 self.url("/api/v1/servers/localhost/zones/" + name),
934 data=json.dumps(payload),
935 headers={'content-type': 'application/json'})
936 self.assert_success(r)
937 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
938 for k in payload.keys():
939 self.assertIn(k, data)
940 self.assertEquals(data[k], payload[k])
941 # update, back to Native and empty(off)
942 payload = {
943 'kind': 'Native',
944 'soa_edit_api': '',
945 'soa_edit': ''
946 }
947 r = self.session.put(
948 self.url("/api/v1/servers/localhost/zones/" + name),
949 data=json.dumps(payload),
950 headers={'content-type': 'application/json'})
951 self.assert_success(r)
952 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
953 for k in payload.keys():
954 self.assertIn(k, data)
955 self.assertEquals(data[k], payload[k])
956
957 def test_zone_rr_update(self):
958 name, payload, zone = self.create_zone()
959 # do a replace (= update)
960 rrset = {
961 'changetype': 'replace',
962 'name': name,
963 'type': 'ns',
964 'ttl': 3600,
965 'records': [
966 {
967 "content": "ns1.bar.com.",
968 "disabled": False
969 },
970 {
971 "content": "ns2-disabled.bar.com.",
972 "disabled": True
973 }
974 ]
975 }
976 payload = {'rrsets': [rrset]}
977 r = self.session.patch(
978 self.url("/api/v1/servers/localhost/zones/" + name),
979 data=json.dumps(payload),
980 headers={'content-type': 'application/json'})
981 self.assert_success(r)
982 # verify that (only) the new record is there
983 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
984 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset['records'])
985
986 def test_zone_rr_update_mx(self):
987 # Important to test with MX records, as they have a priority field, which must end up in the content field.
988 name, payload, zone = self.create_zone()
989 # do a replace (= update)
990 rrset = {
991 'changetype': 'replace',
992 'name': name,
993 'type': 'MX',
994 'ttl': 3600,
995 'records': [
996 {
997 "content": "10 mail.example.org.",
998 "disabled": False
999 }
1000 ]
1001 }
1002 payload = {'rrsets': [rrset]}
1003 r = self.session.patch(
1004 self.url("/api/v1/servers/localhost/zones/" + name),
1005 data=json.dumps(payload),
1006 headers={'content-type': 'application/json'})
1007 self.assert_success(r)
1008 # verify that (only) the new record is there
1009 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1010 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset['records'])
1011
1012 def test_zone_rr_update_invalid_mx(self):
1013 name, payload, zone = self.create_zone()
1014 # do a replace (= update)
1015 rrset = {
1016 'changetype': 'replace',
1017 'name': name,
1018 'type': 'MX',
1019 'ttl': 3600,
1020 'records': [
1021 {
1022 "content": "10 mail@mx.example.org.",
1023 "disabled": False
1024 }
1025 ]
1026 }
1027 payload = {'rrsets': [rrset]}
1028 r = self.session.patch(
1029 self.url("/api/v1/servers/localhost/zones/" + name),
1030 data=json.dumps(payload),
1031 headers={'content-type': 'application/json'})
1032 self.assertEquals(r.status_code, 422)
1033 self.assertIn('non-hostname content', r.json()['error'])
1034 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1035 self.assertIsNone(get_rrset(data, name, 'MX'))
1036
1037 def test_zone_rr_update_opt(self):
1038 name, payload, zone = self.create_zone()
1039 # do a replace (= update)
1040 rrset = {
1041 'changetype': 'replace',
1042 'name': name,
1043 'type': 'OPT',
1044 'ttl': 3600,
1045 'records': [
1046 {
1047 "content": "9",
1048 "disabled": False
1049 }
1050 ]
1051 }
1052 payload = {'rrsets': [rrset]}
1053 r = self.session.patch(
1054 self.url("/api/v1/servers/localhost/zones/" + name),
1055 data=json.dumps(payload),
1056 headers={'content-type': 'application/json'})
1057 self.assertEquals(r.status_code, 422)
1058 self.assertIn('OPT: invalid type given', r.json()['error'])
1059
1060 def test_zone_rr_update_multiple_rrsets(self):
1061 name, payload, zone = self.create_zone()
1062 rrset1 = {
1063 'changetype': 'replace',
1064 'name': name,
1065 'type': 'NS',
1066 'ttl': 3600,
1067 'records': [
1068 {
1069
1070 "content": "ns9999.example.com.",
1071 "disabled": False
1072 }
1073 ]
1074 }
1075 rrset2 = {
1076 'changetype': 'replace',
1077 'name': name,
1078 'type': 'MX',
1079 'ttl': 3600,
1080 'records': [
1081 {
1082 "content": "10 mx444.example.com.",
1083 "disabled": False
1084 }
1085 ]
1086 }
1087 payload = {'rrsets': [rrset1, rrset2]}
1088 r = self.session.patch(
1089 self.url("/api/v1/servers/localhost/zones/" + name),
1090 data=json.dumps(payload),
1091 headers={'content-type': 'application/json'})
1092 self.assert_success(r)
1093 # verify that all rrsets have been updated
1094 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1095 self.assertEquals(get_rrset(data, name, 'NS')['records'], rrset1['records'])
1096 self.assertEquals(get_rrset(data, name, 'MX')['records'], rrset2['records'])
1097
1098 def test_zone_rr_update_duplicate_record(self):
1099 name, payload, zone = self.create_zone()
1100 rrset = {
1101 'changetype': 'replace',
1102 'name': name,
1103 'type': 'NS',
1104 'ttl': 3600,
1105 'records': [
1106 {"content": "ns9999.example.com.", "disabled": False},
1107 {"content": "ns9996.example.com.", "disabled": False},
1108 {"content": "ns9987.example.com.", "disabled": False},
1109 {"content": "ns9988.example.com.", "disabled": False},
1110 {"content": "ns9999.example.com.", "disabled": False},
1111 ]
1112 }
1113 payload = {'rrsets': [rrset]}
1114 r = self.session.patch(
1115 self.url("/api/v1/servers/localhost/zones/" + name),
1116 data=json.dumps(payload),
1117 headers={'content-type': 'application/json'})
1118 self.assertEquals(r.status_code, 422)
1119 self.assertIn('Duplicate record in RRset', r.json()['error'])
1120
1121 def test_zone_rr_update_duplicate_rrset(self):
1122 name, payload, zone = self.create_zone()
1123 rrset1 = {
1124 'changetype': 'replace',
1125 'name': name,
1126 'type': 'NS',
1127 'ttl': 3600,
1128 'records': [
1129 {
1130 "content": "ns9999.example.com.",
1131 "disabled": False
1132 }
1133 ]
1134 }
1135 rrset2 = {
1136 'changetype': 'replace',
1137 'name': name,
1138 'type': 'NS',
1139 'ttl': 3600,
1140 'records': [
1141 {
1142 "content": "ns9998.example.com.",
1143 "disabled": False
1144 }
1145 ]
1146 }
1147 payload = {'rrsets': [rrset1, rrset2]}
1148 r = self.session.patch(
1149 self.url("/api/v1/servers/localhost/zones/" + name),
1150 data=json.dumps(payload),
1151 headers={'content-type': 'application/json'})
1152 self.assertEquals(r.status_code, 422)
1153 self.assertIn('Duplicate RRset', r.json()['error'])
1154
1155 def test_zone_rr_delete(self):
1156 name, payload, zone = self.create_zone()
1157 # do a delete of all NS records (these are created with the zone)
1158 rrset = {
1159 'changetype': 'delete',
1160 'name': name,
1161 'type': 'NS'
1162 }
1163 payload = {'rrsets': [rrset]}
1164 r = self.session.patch(
1165 self.url("/api/v1/servers/localhost/zones/" + name),
1166 data=json.dumps(payload),
1167 headers={'content-type': 'application/json'})
1168 self.assert_success(r)
1169 # verify that the records are gone
1170 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1171 self.assertIsNone(get_rrset(data, name, 'NS'))
1172
1173 def test_zone_disable_reenable(self):
1174 # This also tests that SOA-EDIT-API works.
1175 name, payload, zone = self.create_zone(soa_edit_api='EPOCH')
1176 # disable zone by disabling SOA
1177 rrset = {
1178 'changetype': 'replace',
1179 'name': name,
1180 'type': 'SOA',
1181 'ttl': 3600,
1182 'records': [
1183 {
1184 "content": "ns1.bar.com. hostmaster.foo.org. 1 1 1 1 1",
1185 "disabled": True
1186 }
1187 ]
1188 }
1189 payload = {'rrsets': [rrset]}
1190 r = self.session.patch(
1191 self.url("/api/v1/servers/localhost/zones/" + name),
1192 data=json.dumps(payload),
1193 headers={'content-type': 'application/json'})
1194 self.assert_success(r)
1195 # check SOA serial has been edited
1196 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1197 soa_serial1 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1198 self.assertNotEquals(soa_serial1, '1')
1199 # make sure domain is still in zone list (disabled SOA!)
1200 r = self.session.get(self.url("/api/v1/servers/localhost/zones"))
1201 domains = r.json()
1202 self.assertEquals(len([domain for domain in domains if domain['name'] == name]), 1)
1203 # sleep 1sec to ensure the EPOCH value changes for the next request
1204 time.sleep(1)
1205 # verify that modifying it still works
1206 rrset['records'][0]['disabled'] = False
1207 payload = {'rrsets': [rrset]}
1208 r = self.session.patch(
1209 self.url("/api/v1/servers/localhost/zones/" + name),
1210 data=json.dumps(payload),
1211 headers={'content-type': 'application/json'})
1212 self.assert_success(r)
1213 # check SOA serial has been edited again
1214 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1215 soa_serial2 = get_first_rec(data, name, 'SOA')['content'].split()[2]
1216 self.assertNotEquals(soa_serial2, '1')
1217 self.assertNotEquals(soa_serial2, soa_serial1)
1218
1219 def test_zone_rr_update_out_of_zone(self):
1220 name, payload, zone = self.create_zone()
1221 # replace with qname mismatch
1222 rrset = {
1223 'changetype': 'replace',
1224 'name': 'not-in-zone.',
1225 'type': 'NS',
1226 'ttl': 3600,
1227 'records': [
1228 {
1229 "content": "ns1.bar.com.",
1230 "disabled": False
1231 }
1232 ]
1233 }
1234 payload = {'rrsets': [rrset]}
1235 r = self.session.patch(
1236 self.url("/api/v1/servers/localhost/zones/" + name),
1237 data=json.dumps(payload),
1238 headers={'content-type': 'application/json'})
1239 self.assertEquals(r.status_code, 422)
1240 self.assertIn('out of zone', r.json()['error'])
1241
1242 def test_zone_rr_update_restricted_chars(self):
1243 name, payload, zone = self.create_zone()
1244 # replace with qname mismatch
1245 rrset = {
1246 'changetype': 'replace',
1247 'name': 'test:' + name,
1248 'type': 'NS',
1249 'ttl': 3600,
1250 'records': [
1251 {
1252 "content": "ns1.bar.com.",
1253 "disabled": False
1254 }
1255 ]
1256 }
1257 payload = {'rrsets': [rrset]}
1258 r = self.session.patch(
1259 self.url("/api/v1/servers/localhost/zones/" + name),
1260 data=json.dumps(payload),
1261 headers={'content-type': 'application/json'})
1262 self.assertEquals(r.status_code, 422)
1263 self.assertIn('contains unsupported characters', r.json()['error'])
1264
1265 def test_rrset_unknown_type(self):
1266 name, payload, zone = self.create_zone()
1267 rrset = {
1268 'changetype': 'replace',
1269 'name': name,
1270 'type': 'FAFAFA',
1271 'ttl': 3600,
1272 'records': [
1273 {
1274 "content": "4.3.2.1",
1275 "disabled": False
1276 }
1277 ]
1278 }
1279 payload = {'rrsets': [rrset]}
1280 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1281 headers={'content-type': 'application/json'})
1282 self.assertEquals(r.status_code, 422)
1283 self.assertIn('unknown type', r.json()['error'])
1284
1285 @parameterized.expand([
1286 ('CNAME', ),
1287 ('DNAME', ),
1288 ])
1289 def test_rrset_exclusive_and_other(self, qtype):
1290 name, payload, zone = self.create_zone()
1291 rrset = {
1292 'changetype': 'replace',
1293 'name': name,
1294 'type': qtype,
1295 'ttl': 3600,
1296 'records': [
1297 {
1298 "content": "example.org.",
1299 "disabled": False
1300 }
1301 ]
1302 }
1303 payload = {'rrsets': [rrset]}
1304 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1305 headers={'content-type': 'application/json'})
1306 self.assertEquals(r.status_code, 422)
1307 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1308
1309 @parameterized.expand([
1310 ('CNAME', ),
1311 ('DNAME', ),
1312 ])
1313 def test_rrset_other_and_exclusive(self, qtype):
1314 name, payload, zone = self.create_zone()
1315 rrset = {
1316 'changetype': 'replace',
1317 'name': 'sub.'+name,
1318 'type': qtype,
1319 'ttl': 3600,
1320 'records': [
1321 {
1322 "content": "example.org.",
1323 "disabled": False
1324 }
1325 ]
1326 }
1327 payload = {'rrsets': [rrset]}
1328 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1329 headers={'content-type': 'application/json'})
1330 self.assert_success(r)
1331 rrset = {
1332 'changetype': 'replace',
1333 'name': 'sub.'+name,
1334 'type': 'A',
1335 'ttl': 3600,
1336 'records': [
1337 {
1338 "content": "1.2.3.4",
1339 "disabled": False
1340 }
1341 ]
1342 }
1343 payload = {'rrsets': [rrset]}
1344 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1345 headers={'content-type': 'application/json'})
1346 self.assertEquals(r.status_code, 422)
1347 self.assertIn('Conflicts with pre-existing RRset', r.json()['error'])
1348
1349 @parameterized.expand([
1350 ('SOA', ['ns1.example.org. test@example.org. 10 10800 3600 604800 3600', 'ns2.example.org. test@example.org. 10 10800 3600 604800 3600']),
1351 ('CNAME', ['01.example.org.', '02.example.org.']),
1352 ('DNAME', ['01.example.org.', '02.example.org.']),
1353 ])
1354 def test_rrset_single_qtypes(self, qtype, contents):
1355 name, payload, zone = self.create_zone()
1356 rrset = {
1357 'changetype': 'replace',
1358 'name': 'sub.'+name,
1359 'type': qtype,
1360 'ttl': 3600,
1361 'records': [
1362 {
1363 "content": contents[0],
1364 "disabled": False
1365 },
1366 {
1367 "content": contents[1],
1368 "disabled": False
1369 }
1370 ]
1371 }
1372 payload = {'rrsets': [rrset]}
1373 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1374 headers={'content-type': 'application/json'})
1375 self.assertEquals(r.status_code, 422)
1376 self.assertIn('IN ' + qtype + ' has more than one record', r.json()['error'])
1377
1378 def test_create_zone_with_leading_space(self):
1379 # Actual regression.
1380 name, payload, zone = self.create_zone()
1381 rrset = {
1382 'changetype': 'replace',
1383 'name': name,
1384 'type': 'A',
1385 'ttl': 3600,
1386 'records': [
1387 {
1388 "content": " 4.3.2.1",
1389 "disabled": False
1390 }
1391 ]
1392 }
1393 payload = {'rrsets': [rrset]}
1394 r = self.session.patch(self.url("/api/v1/servers/localhost/zones/" + name), data=json.dumps(payload),
1395 headers={'content-type': 'application/json'})
1396 self.assertEquals(r.status_code, 422)
1397 self.assertIn('Not in expected format', r.json()['error'])
1398
1399 def test_zone_rr_delete_out_of_zone(self):
1400 name, payload, zone = self.create_zone()
1401 rrset = {
1402 'changetype': 'delete',
1403 'name': 'not-in-zone.',
1404 'type': 'NS'
1405 }
1406 payload = {'rrsets': [rrset]}
1407 r = self.session.patch(
1408 self.url("/api/v1/servers/localhost/zones/" + name),
1409 data=json.dumps(payload),
1410 headers={'content-type': 'application/json'})
1411 print(r.content)
1412 self.assert_success(r) # succeed so users can fix their wrong, old data
1413
1414 def test_zone_delete(self):
1415 name, payload, zone = self.create_zone()
1416 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
1417 self.assertEquals(r.status_code, 204)
1418 self.assertNotIn('Content-Type', r.headers)
1419
1420 def test_zone_comment_create(self):
1421 name, payload, zone = self.create_zone()
1422 rrset = {
1423 'changetype': 'replace',
1424 'name': name,
1425 'type': 'NS',
1426 'ttl': 3600,
1427 'comments': [
1428 {
1429 'account': 'test1',
1430 'content': 'blah blah',
1431 },
1432 {
1433 'account': 'test2',
1434 'content': 'blah blah bleh',
1435 }
1436 ]
1437 }
1438 payload = {'rrsets': [rrset]}
1439 r = self.session.patch(
1440 self.url("/api/v1/servers/localhost/zones/" + name),
1441 data=json.dumps(payload),
1442 headers={'content-type': 'application/json'})
1443 self.assert_success(r)
1444 # make sure the comments have been set, and that the NS
1445 # records are still present
1446 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1447 serverset = get_rrset(data, name, 'NS')
1448 print(serverset)
1449 self.assertNotEquals(serverset['records'], [])
1450 self.assertNotEquals(serverset['comments'], [])
1451 # verify that modified_at has been set by pdns
1452 self.assertNotEquals([c for c in serverset['comments']][0]['modified_at'], 0)
1453 # verify that TTL is correct (regression test)
1454 self.assertEquals(serverset['ttl'], 3600)
1455
1456 def test_zone_comment_delete(self):
1457 # Test: Delete ONLY comments.
1458 name, payload, zone = self.create_zone()
1459 rrset = {
1460 'changetype': 'replace',
1461 'name': name,
1462 'type': 'NS',
1463 'comments': []
1464 }
1465 payload = {'rrsets': [rrset]}
1466 r = self.session.patch(
1467 self.url("/api/v1/servers/localhost/zones/" + name),
1468 data=json.dumps(payload),
1469 headers={'content-type': 'application/json'})
1470 self.assert_success(r)
1471 # make sure the NS records are still present
1472 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1473 serverset = get_rrset(data, name, 'NS')
1474 print(serverset)
1475 self.assertNotEquals(serverset['records'], [])
1476 self.assertEquals(serverset['comments'], [])
1477
1478 def test_zone_comment_out_of_range_modified_at(self):
1479 # Test if comments on an rrset stay intact if the rrset is replaced
1480 name, payload, zone = self.create_zone()
1481 rrset = {
1482 'changetype': 'replace',
1483 'name': name,
1484 'type': 'NS',
1485 'comments': [
1486 {
1487 'account': 'test1',
1488 'content': 'oh hi there',
1489 'modified_at': '4294967297'
1490 }
1491 ]
1492 }
1493 payload = {'rrsets': [rrset]}
1494 r = self.session.patch(
1495 self.url("/api/v1/servers/localhost/zones/" + name),
1496 data=json.dumps(payload),
1497 headers={'content-type': 'application/json'})
1498 self.assertEquals(r.status_code, 422)
1499 self.assertIn("Value for key 'modified_at' is out of range", r.json()['error'])
1500
1501 def test_zone_comment_stay_intact(self):
1502 # Test if comments on an rrset stay intact if the rrset is replaced
1503 name, payload, zone = self.create_zone()
1504 # create a comment
1505 rrset = {
1506 'changetype': 'replace',
1507 'name': name,
1508 'type': 'NS',
1509 'comments': [
1510 {
1511 'account': 'test1',
1512 'content': 'oh hi there',
1513 'modified_at': 1111
1514 }
1515 ]
1516 }
1517 payload = {'rrsets': [rrset]}
1518 r = self.session.patch(
1519 self.url("/api/v1/servers/localhost/zones/" + name),
1520 data=json.dumps(payload),
1521 headers={'content-type': 'application/json'})
1522 self.assert_success(r)
1523 # replace rrset records
1524 rrset2 = {
1525 'changetype': 'replace',
1526 'name': name,
1527 'type': 'NS',
1528 'ttl': 3600,
1529 'records': [
1530 {
1531 "content": "ns1.bar.com.",
1532 "disabled": False
1533 }
1534 ]
1535 }
1536 payload2 = {'rrsets': [rrset2]}
1537 r = self.session.patch(
1538 self.url("/api/v1/servers/localhost/zones/" + name),
1539 data=json.dumps(payload2),
1540 headers={'content-type': 'application/json'})
1541 self.assert_success(r)
1542 # make sure the comments still exist
1543 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + name)).json()
1544 serverset = get_rrset(data, name, 'NS')
1545 print(serverset)
1546 self.assertEquals(serverset['records'], rrset2['records'])
1547 self.assertEquals(serverset['comments'], rrset['comments'])
1548
1549 def test_zone_auto_ptr_ipv4_create(self):
1550 revzone = '4.2.192.in-addr.arpa.'
1551 _, _, revzonedata = self.create_zone(name=revzone)
1552 name = unique_zone_name()
1553 rrset = {
1554 "name": name,
1555 "type": "A",
1556 "ttl": 3600,
1557 "records": [{
1558 "content": "192.2.4.44",
1559 "disabled": False,
1560 "set-ptr": True,
1561 }],
1562 }
1563 name, payload, data = self.create_zone(name=name, rrsets=[rrset])
1564 del rrset['records'][0]['set-ptr']
1565 self.assertEquals(get_rrset(data, name, 'A')['records'], rrset['records'])
1566 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1567 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1568 print(revsets)
1569 self.assertEquals(revsets, [{
1570 u'name': u'44.4.2.192.in-addr.arpa.',
1571 u'ttl': 3600,
1572 u'type': u'PTR',
1573 u'comments': [],
1574 u'records': [{
1575 u'content': name,
1576 u'disabled': False,
1577 }],
1578 }])
1579 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1580 self.assertGreater(r['serial'], revzonedata['serial'])
1581
1582 def test_zone_auto_ptr_ipv4_update(self):
1583 revzone = '0.2.192.in-addr.arpa.'
1584 _, _, revzonedata = self.create_zone(name=revzone)
1585 name, payload, zone = self.create_zone()
1586 rrset = {
1587 'changetype': 'replace',
1588 'name': name,
1589 'type': 'A',
1590 'ttl': 3600,
1591 'records': [
1592 {
1593 "content": '192.2.0.2',
1594 "disabled": False,
1595 "set-ptr": True
1596 }
1597 ]
1598 }
1599 payload = {'rrsets': [rrset]}
1600 r = self.session.patch(
1601 self.url("/api/v1/servers/localhost/zones/" + name),
1602 data=json.dumps(payload),
1603 headers={'content-type': 'application/json'})
1604 self.assert_success(r)
1605 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1606 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1607 print(revsets)
1608 self.assertEquals(revsets, [{
1609 u'name': u'2.0.2.192.in-addr.arpa.',
1610 u'ttl': 3600,
1611 u'type': u'PTR',
1612 u'comments': [],
1613 u'records': [{
1614 u'content': name,
1615 u'disabled': False,
1616 }],
1617 }])
1618 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1619 self.assertGreater(r['serial'], revzonedata['serial'])
1620
1621 def test_zone_auto_ptr_ipv6_update(self):
1622 # 2001:DB8::bb:aa
1623 revzone = '8.b.d.0.1.0.0.2.ip6.arpa.'
1624 _, _, revzonedata = self.create_zone(name=revzone)
1625 name, payload, zone = self.create_zone()
1626 rrset = {
1627 'changetype': 'replace',
1628 'name': name,
1629 'type': 'AAAA',
1630 'ttl': 3600,
1631 'records': [
1632 {
1633 "content": '2001:DB8::bb:aa',
1634 "disabled": False,
1635 "set-ptr": True
1636 }
1637 ]
1638 }
1639 payload = {'rrsets': [rrset]}
1640 r = self.session.patch(
1641 self.url("/api/v1/servers/localhost/zones/" + name),
1642 data=json.dumps(payload),
1643 headers={'content-type': 'application/json'})
1644 self.assert_success(r)
1645 r = self.session.get(self.url("/api/v1/servers/localhost/zones/" + revzone)).json()
1646 revsets = [s for s in r['rrsets'] if s['type'] == 'PTR']
1647 print(revsets)
1648 self.assertEquals(revsets, [{
1649 u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa.',
1650 u'ttl': 3600,
1651 u'type': u'PTR',
1652 u'comments': [],
1653 u'records': [{
1654 u'content': name,
1655 u'disabled': False,
1656 }],
1657 }])
1658 # with SOA-EDIT-API DEFAULT on the revzone, the serial should now be higher.
1659 self.assertGreater(r['serial'], revzonedata['serial'])
1660
1661 def test_search_rr_exact_zone(self):
1662 name = unique_zone_name()
1663 self.create_zone(name=name, serial=22, soa_edit_api='')
1664 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.')))
1665 self.assert_success_json(r)
1666 print(r.json())
1667 self.assertEquals(r.json(), [
1668 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1669 {u'content': u'ns1.example.com.',
1670 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1671 u'ttl': 3600, u'type': u'NS', u'name': name},
1672 {u'content': u'ns2.example.com.',
1673 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1674 u'ttl': 3600, u'type': u'NS', u'name': name},
1675 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1676 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1677 u'ttl': 3600, u'type': u'SOA', u'name': name},
1678 ])
1679
1680 def test_search_rr_exact_zone_filter_type_zone(self):
1681 name = unique_zone_name()
1682 data_type = "zone"
1683 self.create_zone(name=name, serial=22, soa_edit_api='')
1684 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
1685 self.assert_success_json(r)
1686 print(r.json())
1687 self.assertEquals(r.json(), [
1688 {u'object_type': u'zone', u'name': name, u'zone_id': name},
1689 ])
1690
1691 def test_search_rr_exact_zone_filter_type_record(self):
1692 name = unique_zone_name()
1693 data_type = "record"
1694 self.create_zone(name=name, serial=22, soa_edit_api='')
1695 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name.rstrip('.') + "&object_type=" + data_type))
1696 self.assert_success_json(r)
1697 print(r.json())
1698 self.assertEquals(r.json(), [
1699 {u'content': u'ns1.example.com.',
1700 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1701 u'ttl': 3600, u'type': u'NS', u'name': name},
1702 {u'content': u'ns2.example.com.',
1703 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1704 u'ttl': 3600, u'type': u'NS', u'name': name},
1705 {u'content': u'a.misconfigured.powerdns.server. hostmaster.'+name+' 22 10800 3600 604800 3600',
1706 u'zone_id': name, u'zone': name, u'object_type': u'record', u'disabled': False,
1707 u'ttl': 3600, u'type': u'SOA', u'name': name},
1708 ])
1709
1710 def test_search_rr_substring(self):
1711 name = unique_zone_name()
1712 search = name[5:-5]
1713 self.create_zone(name=name)
1714 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1715 self.assert_success_json(r)
1716 print(r.json())
1717 # should return zone, SOA, ns1, ns2
1718 self.assertEquals(len(r.json()), 4)
1719
1720 def test_search_rr_case_insensitive(self):
1721 name = unique_zone_name()+'testsuffix.'
1722 self.create_zone(name=name)
1723 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*testSUFFIX*"))
1724 self.assert_success_json(r)
1725 print(r.json())
1726 # should return zone, SOA, ns1, ns2
1727 self.assertEquals(len(r.json()), 4)
1728
1729 def test_search_after_rectify_with_ent(self):
1730 name = unique_zone_name()
1731 search = name.split('.')[0]
1732 rrset = {
1733 "name": 'sub.sub.' + name,
1734 "type": "A",
1735 "ttl": 3600,
1736 "records": [{
1737 "content": "4.3.2.1",
1738 "disabled": False,
1739 }],
1740 }
1741 self.create_zone(name=name, rrsets=[rrset])
1742 pdnsutil_rectify(name)
1743 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=*%s*" % search))
1744 self.assert_success_json(r)
1745 print(r.json())
1746 # should return zone, SOA, ns1, ns2, sub.sub A (but not the ENT)
1747 self.assertEquals(len(r.json()), 5)
1748
1749 def test_default_api_rectify(self):
1750 name = unique_zone_name()
1751 search = name.split('.')[0]
1752 rrsets = [
1753 {
1754 "name": 'a.' + name,
1755 "type": "AAAA",
1756 "ttl": 3600,
1757 "records": [{
1758 "content": "2001:DB8::1",
1759 "disabled": False,
1760 }],
1761 },
1762 {
1763 "name": 'b.' + name,
1764 "type": "AAAA",
1765 "ttl": 3600,
1766 "records": [{
1767 "content": "2001:DB8::2",
1768 "disabled": False,
1769 }],
1770 },
1771 ]
1772 self.create_zone(name=name, rrsets=rrsets, dnssec=True, nsec3param='1 0 1 ab')
1773 dbrecs = get_db_records(name, 'AAAA')
1774 self.assertIsNotNone(dbrecs[0]['ordername'])
1775
1776 def test_override_api_rectify(self):
1777 name = unique_zone_name()
1778 search = name.split('.')[0]
1779 rrsets = [
1780 {
1781 "name": 'a.' + name,
1782 "type": "AAAA",
1783 "ttl": 3600,
1784 "records": [{
1785 "content": "2001:DB8::1",
1786 "disabled": False,
1787 }],
1788 },
1789 {
1790 "name": 'b.' + name,
1791 "type": "AAAA",
1792 "ttl": 3600,
1793 "records": [{
1794 "content": "2001:DB8::2",
1795 "disabled": False,
1796 }],
1797 },
1798 ]
1799 self.create_zone(name=name, rrsets=rrsets, api_rectify=False, dnssec=True, nsec3param='1 0 1 ab')
1800 dbrecs = get_db_records(name, 'AAAA')
1801 self.assertIsNone(dbrecs[0]['ordername'])
1802
1803 def test_cname_at_ent_place(self):
1804 name, payload, zone = self.create_zone(api_rectify=True)
1805 rrset = {
1806 'changetype': 'replace',
1807 'name': 'sub2.sub1.' + name,
1808 'type': "A",
1809 'ttl': 3600,
1810 'records': [{
1811 'content': "4.3.2.1",
1812 'disabled': False,
1813 }],
1814 }
1815 payload = {'rrsets': [rrset]}
1816 r = self.session.patch(
1817 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1818 data=json.dumps(payload),
1819 headers={'content-type': 'application/json'})
1820 self.assertEquals(r.status_code, 204)
1821 rrset = {
1822 'changetype': 'replace',
1823 'name': 'sub1.' + name,
1824 'type': "CNAME",
1825 'ttl': 3600,
1826 'records': [{
1827 'content': "www.example.org.",
1828 'disabled': False,
1829 }],
1830 }
1831 payload = {'rrsets': [rrset]}
1832 r = self.session.patch(
1833 self.url("/api/v1/servers/localhost/zones/" + zone['id']),
1834 data=json.dumps(payload),
1835 headers={'content-type': 'application/json'})
1836 self.assertEquals(r.status_code, 204)
1837
1838 def test_rrset_parameter_post_false(self):
1839 name = unique_zone_name()
1840 payload = {
1841 'name': name,
1842 'kind': 'Native',
1843 'nameservers': ['ns1.example.com.', 'ns2.example.com.']
1844 }
1845 r = self.session.post(
1846 self.url("/api/v1/servers/localhost/zones?rrsets=false"),
1847 data=json.dumps(payload),
1848 headers={'content-type': 'application/json'})
1849 print(r.json())
1850 self.assert_success_json(r)
1851 self.assertEquals(r.status_code, 201)
1852 self.assertEquals(r.json().get('rrsets'), None)
1853
1854 def test_rrset_false_parameter(self):
1855 name = unique_zone_name()
1856 self.create_zone(name=name, kind='Native')
1857 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=false"))
1858 self.assert_success_json(r)
1859 print(r.json())
1860 self.assertEquals(r.json().get('rrsets'), None)
1861
1862 def test_rrset_true_parameter(self):
1863 name = unique_zone_name()
1864 self.create_zone(name=name, kind='Native')
1865 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=true"))
1866 self.assert_success_json(r)
1867 print(r.json())
1868 self.assertEquals(len(r.json().get('rrsets')), 2)
1869
1870 def test_wrong_rrset_parameter(self):
1871 name = unique_zone_name()
1872 self.create_zone(name=name, kind='Native')
1873 r = self.session.get(self.url("/api/v1/servers/localhost/zones/"+name+"?rrsets=foobar"))
1874 self.assertEquals(r.status_code, 422)
1875 self.assertIn("'rrsets' request parameter value 'foobar' is not supported", r.json()['error'])
1876
1877 def test_put_master_tsig_key_ids_non_existent(self):
1878 name = unique_zone_name()
1879 keyname = unique_zone_name().split('.')[0]
1880 self.create_zone(name=name, kind='Native')
1881 payload = {
1882 'master_tsig_key_ids': [keyname]
1883 }
1884 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
1885 data=json.dumps(payload),
1886 headers={'content-type': 'application/json'})
1887 self.assertEquals(r.status_code, 422)
1888 self.assertIn('A TSIG key with the name', r.json()['error'])
1889
1890 def test_put_slave_tsig_key_ids_non_existent(self):
1891 name = unique_zone_name()
1892 keyname = unique_zone_name().split('.')[0]
1893 self.create_zone(name=name, kind='Native')
1894 payload = {
1895 'slave_tsig_key_ids': [keyname]
1896 }
1897 r = self.session.put(self.url('/api/v1/servers/localhost/zones/' + name),
1898 data=json.dumps(payload),
1899 headers={'content-type': 'application/json'})
1900 self.assertEquals(r.status_code, 422)
1901 self.assertIn('A TSIG key with the name', r.json()['error'])
1902
1903
1904 @unittest.skipIf(not is_auth(), "Not applicable")
1905 class AuthRootZone(ApiTestCase, AuthZonesHelperMixin):
1906
1907 def setUp(self):
1908 super(AuthRootZone, self).setUp()
1909 # zone name is not unique, so delete the zone before each individual test.
1910 self.session.delete(self.url("/api/v1/servers/localhost/zones/=2E"))
1911
1912 def test_create_zone(self):
1913 name, payload, data = self.create_zone(name='.', serial=22, soa_edit_api='')
1914 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit', 'account'):
1915 self.assertIn(k, data)
1916 if k in payload:
1917 self.assertEquals(data[k], payload[k])
1918 # validate generated SOA
1919 rec = get_first_rec(data, '.', 'SOA')
1920 self.assertEquals(
1921 rec['content'],
1922 "a.misconfigured.powerdns.server. hostmaster. " + str(payload['serial']) +
1923 " 10800 3600 604800 3600"
1924 )
1925 # Regression test: verify zone list works
1926 zonelist = self.session.get(self.url("/api/v1/servers/localhost/zones")).json()
1927 print("zonelist:", zonelist)
1928 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
1929 # Also test that fetching the zone works.
1930 print("id:", data['id'])
1931 self.assertEquals(data['id'], '=2E')
1932 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + data['id'])).json()
1933 print("zone (fetched):", data)
1934 for k in ('name', 'kind'):
1935 self.assertIn(k, data)
1936 self.assertEquals(data[k], payload[k])
1937 self.assertEqual(data['rrsets'][0]['name'], '.')
1938
1939 def test_update_zone(self):
1940 name, payload, zone = self.create_zone(name='.')
1941 zone_id = '=2E'
1942 # update, set as Master and enable SOA-EDIT-API
1943 payload = {
1944 'kind': 'Master',
1945 'masters': ['192.0.2.1', '192.0.2.2'],
1946 'soa_edit_api': 'EPOCH',
1947 'soa_edit': 'EPOCH'
1948 }
1949 r = self.session.put(
1950 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1951 data=json.dumps(payload),
1952 headers={'content-type': 'application/json'})
1953 self.assert_success(r)
1954 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1955 for k in payload.keys():
1956 self.assertIn(k, data)
1957 self.assertEquals(data[k], payload[k])
1958 # update, back to Native and empty(off)
1959 payload = {
1960 'kind': 'Native',
1961 'soa_edit_api': '',
1962 'soa_edit': ''
1963 }
1964 r = self.session.put(
1965 self.url("/api/v1/servers/localhost/zones/" + zone_id),
1966 data=json.dumps(payload),
1967 headers={'content-type': 'application/json'})
1968 self.assert_success(r)
1969 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + zone_id)).json()
1970 for k in payload.keys():
1971 self.assertIn(k, data)
1972 self.assertEquals(data[k], payload[k])
1973
1974
1975 @unittest.skipIf(not is_recursor(), "Not applicable")
1976 class RecursorZones(ApiTestCase):
1977
1978 def create_zone(self, name=None, kind=None, rd=False, servers=None):
1979 if name is None:
1980 name = unique_zone_name()
1981 if servers is None:
1982 servers = []
1983 payload = {
1984 'name': name,
1985 'kind': kind,
1986 'servers': servers,
1987 'recursion_desired': rd
1988 }
1989 r = self.session.post(
1990 self.url("/api/v1/servers/localhost/zones"),
1991 data=json.dumps(payload),
1992 headers={'content-type': 'application/json'})
1993 self.assert_success_json(r)
1994 return payload, r.json()
1995
1996 def test_create_auth_zone(self):
1997 payload, data = self.create_zone(kind='Native')
1998 for k in payload.keys():
1999 self.assertEquals(data[k], payload[k])
2000
2001 def test_create_zone_no_name(self):
2002 payload = {
2003 'name': '',
2004 'kind': 'Native',
2005 'servers': ['8.8.8.8'],
2006 'recursion_desired': False,
2007 }
2008 print(payload)
2009 r = self.session.post(
2010 self.url("/api/v1/servers/localhost/zones"),
2011 data=json.dumps(payload),
2012 headers={'content-type': 'application/json'})
2013 self.assertEquals(r.status_code, 422)
2014 self.assertIn('is not canonical', r.json()['error'])
2015
2016 def test_create_forwarded_zone(self):
2017 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
2018 # return values are normalized
2019 payload['servers'][0] += ':53'
2020 for k in payload.keys():
2021 self.assertEquals(data[k], payload[k])
2022
2023 def test_create_forwarded_rd_zone(self):
2024 payload, data = self.create_zone(name='google.com.', kind='Forwarded', rd=True, servers=['8.8.8.8'])
2025 # return values are normalized
2026 payload['servers'][0] += ':53'
2027 for k in payload.keys():
2028 self.assertEquals(data[k], payload[k])
2029
2030 def test_create_auth_zone_with_symbols(self):
2031 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
2032 expected_id = (payload['name'].replace('/', '=2F'))
2033 for k in payload.keys():
2034 self.assertEquals(data[k], payload[k])
2035 self.assertEquals(data['id'], expected_id)
2036
2037 def test_rename_auth_zone(self):
2038 payload, data = self.create_zone(kind='Native')
2039 name = payload['name']
2040 # now rename it
2041 payload = {
2042 'name': 'renamed-'+name,
2043 'kind': 'Native',
2044 'recursion_desired': False
2045 }
2046 r = self.session.put(
2047 self.url("/api/v1/servers/localhost/zones/" + name),
2048 data=json.dumps(payload),
2049 headers={'content-type': 'application/json'})
2050 self.assert_success(r)
2051 data = self.session.get(self.url("/api/v1/servers/localhost/zones/" + payload['name'])).json()
2052 for k in payload.keys():
2053 self.assertEquals(data[k], payload[k])
2054
2055 def test_zone_delete(self):
2056 payload, zone = self.create_zone(kind='Native')
2057 name = payload['name']
2058 r = self.session.delete(self.url("/api/v1/servers/localhost/zones/" + name))
2059 self.assertEquals(r.status_code, 204)
2060 self.assertNotIn('Content-Type', r.headers)
2061
2062 def test_search_rr_exact_zone(self):
2063 name = unique_zone_name()
2064 self.create_zone(name=name, kind='Native')
2065 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=" + name))
2066 self.assert_success_json(r)
2067 print(r.json())
2068 self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
2069
2070 def test_search_rr_substring(self):
2071 name = 'search-rr-zone.name.'
2072 self.create_zone(name=name, kind='Native')
2073 r = self.session.get(self.url("/api/v1/servers/localhost/search-data?q=rr-zone"))
2074 self.assert_success_json(r)
2075 print(r.json())
2076 # should return zone, SOA
2077 self.assertEquals(len(r.json()), 2)
2078
2079 @unittest.skipIf(not is_auth(), "Not applicable")
2080 class AuthZoneKeys(ApiTestCase, AuthZonesHelperMixin):
2081
2082 def test_get_keys(self):
2083 r = self.session.get(
2084 self.url("/api/v1/servers/localhost/zones/powerdnssec.org./cryptokeys"))
2085 self.assert_success_json(r)
2086 keys = r.json()
2087 self.assertGreater(len(keys), 0)
2088
2089 key0 = deepcopy(keys[0])
2090 del key0['dnskey']
2091 del key0['ds']
2092 expected = {
2093 u'algorithm': u'ECDSAP256SHA256',
2094 u'bits': 256,
2095 u'active': True,
2096 u'type': u'Cryptokey',
2097 u'keytype': u'csk',
2098 u'flags': 257,
2099 u'id': 1}
2100 self.assertEquals(key0, expected)
2101
2102 keydata = keys[0]['dnskey'].split()
2103 self.assertEqual(len(keydata), 4)