]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.api/test_Zones.py
our houseKeeping mthread could (and would) frequently be run several times simultaneo...
[thirdparty/pdns.git] / regression-tests.api / test_Zones.py
CommitLineData
e2dba705 1import json
d29d5db7 2import time
e2dba705 3import unittest
c1374bdb 4from test_helper import ApiTestCase, unique_zone_name, is_auth, is_recursor
1a152698
CH
5
6
02945d9a 7class Zones(ApiTestCase):
1a152698 8
c1374bdb 9 def test_list_zones(self):
1a152698 10 r = self.session.get(self.url("/servers/localhost/zones"))
c1374bdb 11 self.assert_success_json(r)
45de6290 12 domains = r.json()
02945d9a 13 example_com = [domain for domain in domains if domain['name'] in ('example.com', 'example.com.')]
1a152698
CH
14 self.assertEquals(len(example_com), 1)
15 example_com = example_com[0]
02945d9a 16 required_fields = ['id', 'url', 'name', 'kind']
c1374bdb 17 if is_auth():
02945d9a 18 required_fields = required_fields + ['masters', 'last_check', 'notified_serial', 'serial']
c1374bdb 19 elif is_recursor():
02945d9a
CH
20 required_fields = required_fields + ['recursion_desired', 'servers']
21 for field in required_fields:
22 self.assertIn(field, example_com)
23
24
c1374bdb 25@unittest.skipIf(not is_auth(), "Not applicable")
02945d9a 26class AuthZones(ApiTestCase):
e2dba705 27
284fdfe9 28 def create_zone(self, name=None, **kwargs):
bee2acae
CH
29 if name is None:
30 name = unique_zone_name()
e2dba705 31 payload = {
bee2acae 32 'name': name,
e2dba705 33 'kind': 'Native',
bee2acae 34 'nameservers': ['ns1.example.com', 'ns2.example.com']
e2dba705 35 }
284fdfe9 36 for k, v in kwargs.items():
4bdff352
CH
37 if v is None:
38 del payload[k]
39 else:
40 payload[k] = v
284fdfe9 41 print payload
e2dba705
CH
42 r = self.session.post(
43 self.url("/servers/localhost/zones"),
44 data=json.dumps(payload),
45 headers={'content-type': 'application/json'})
c1374bdb 46 self.assert_success_json(r)
64a36f0d 47 self.assertEquals(r.status_code, 201)
c1374bdb 48 return payload, r.json()
bee2acae 49
c1374bdb 50 def test_create_zone(self):
f63168e6 51 payload, data = self.create_zone(serial=22)
6bb25159 52 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'soa_edit_api', 'soa_edit'):
d29d5db7
CH
53 self.assertIn(k, data)
54 if k in payload:
55 self.assertEquals(data[k], payload[k])
56 self.assertEquals(data['comments'], [])
f63168e6
CH
57 # validate generated SOA
58 self.assertEquals(
59 [r['content'] for r in data['records'] if r['type'] == 'SOA'][0],
c1374bdb
CH
60 "a.misconfigured.powerdns.server hostmaster." + payload['name'] + " " + str(payload['serial']) +
61 " 10800 3600 604800 3600"
f63168e6 62 )
d29d5db7 63
c1374bdb 64 def test_create_zone_with_soa_edit_api(self):
f63168e6
CH
65 # soa_edit_api wins over serial
66 payload, data = self.create_zone(soa_edit_api='EPOCH', serial=10)
67 for k in ('soa_edit_api', ):
e2dba705
CH
68 self.assertIn(k, data)
69 if k in payload:
70 self.assertEquals(data[k], payload[k])
f63168e6
CH
71 # generated EPOCH serial surely is > fixed serial we passed in
72 print data
73 self.assertGreater(data['serial'], payload['serial'])
74 soa_serial = int([r['content'].split(' ')[2] for r in data['records'] if r['type'] == 'SOA'][0])
75 self.assertGreater(soa_serial, payload['serial'])
76 self.assertEquals(soa_serial, data['serial'])
6bb25159 77
c1374bdb 78 def test_create_zone_with_records(self):
f63168e6
CH
79 name = unique_zone_name()
80 records = [
81 {
82 "name": name,
83 "type": "A",
f63168e6
CH
84 "ttl": 3600,
85 "content": "4.3.2.1",
86 "disabled": False
87 }
88 ]
89 payload, data = self.create_zone(name=name, records=records)
90 # check our record has appeared
91 self.assertEquals([r for r in data['records'] if r['type'] == records[0]['type']], records)
92
c1374bdb 93 def test_create_zone_with_comments(self):
f63168e6
CH
94 name = unique_zone_name()
95 comments = [
96 {
97 'name': name,
98 'type': 'SOA',
99 'account': 'test1',
100 'content': 'blah blah',
101 'modified_at': 11112,
102 }
103 ]
104 payload, data = self.create_zone(name=name, comments=comments)
105 # check our comment has appeared
106 self.assertEquals(data['comments'], comments)
107
c1374bdb 108 def test_create_zone_with_custom_soa(self):
f63168e6
CH
109 name = unique_zone_name()
110 records = [
111 {
112 "name": name,
113 "type": "SOA",
f63168e6
CH
114 "ttl": 3600,
115 "content": "ns1.example.net testmaster@example.net 10 10800 3600 604800 3600",
116 "disabled": False
117 }
118 ]
119 payload, data = self.create_zone(name=name, records=records)
120 self.assertEquals([r for r in data['records'] if r['type'] == records[0]['type']], records)
05776d2f 121
c1374bdb 122 def test_create_zone_trailing_dot(self):
4ebf78b1
CH
123 # Trailing dots should not end up in the zone name.
124 basename = unique_zone_name()
125 payload, data = self.create_zone(name=basename+'.')
126 self.assertEquals(data['name'], basename)
127
c1374bdb 128 def test_create_zone_with_symbols(self):
bee2acae
CH
129 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
130 name = payload['name']
1dbe38ba 131 expected_id = (name.replace('/', '=2F')) + '.'
00a9b229
CH
132 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
133 self.assertIn(k, data)
134 if k in payload:
135 self.assertEquals(data[k], payload[k])
bee2acae 136 self.assertEquals(data['id'], expected_id)
00a9b229 137
c1374bdb 138 def test_create_zone_with_nameservers_non_string(self):
e90b4e38
CH
139 # ensure we don't crash
140 name = unique_zone_name()
141 payload = {
142 'name': name,
143 'kind': 'Native',
144 'nameservers': [{'a': 'ns1.example.com'}] # invalid
145 }
146 print payload
147 r = self.session.post(
148 self.url("/servers/localhost/zones"),
149 data=json.dumps(payload),
150 headers={'content-type': 'application/json'})
151 self.assertEquals(r.status_code, 422)
152
4bdff352
CH
153 def test_create_slave_zone(self):
154 # Test that nameservers can be absent for slave zones.
155 payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
156 for k in ('name', 'masters', 'kind'):
157 self.assertIn(k, data)
158 self.assertEquals(data[k], payload[k])
4de11a54
CH
159 print "payload:", payload
160 print "data:", data
161 # Because slave zones don't get a SOA, we need to test that they'll show up in the zone list.
162 r = self.session.get(self.url("/servers/localhost/zones"))
163 zonelist = r.json()
164 print "zonelist:", zonelist
165 self.assertIn(payload['name'], [zone['name'] for zone in zonelist])
166 # Also test that fetching the zone works.
167 r = self.session.get(self.url("/servers/localhost/zones/" + data['id']))
168 data = r.json()
169 print "zone (fetched):", data
170 for k in ('name', 'masters', 'kind'):
171 self.assertIn(k, data)
172 self.assertEquals(data[k], payload[k])
173 self.assertEqual(data['serial'], 0)
174 self.assertEqual(data['records'], [])
175
176 def test_delete_slave_zone(self):
177 payload, data = self.create_zone(kind='Slave', nameservers=None, masters=['127.0.0.2'])
178 r = self.session.delete(self.url("/servers/localhost/zones/" + data['id']))
179 r.raise_for_status()
4bdff352 180
c1374bdb 181 def test_get_zone_with_symbols(self):
3c3c006b
CH
182 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name())
183 name = payload['name']
1dbe38ba 184 zone_id = (name.replace('/', '=2F')) + '.'
3c3c006b 185 r = self.session.get(self.url("/servers/localhost/zones/" + zone_id))
c1374bdb 186 data = r.json()
6bb25159 187 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial', 'dnssec'):
3c3c006b
CH
188 self.assertIn(k, data)
189 if k in payload:
190 self.assertEquals(data[k], payload[k])
191
c1374bdb 192 def test_get_zone(self):
05776d2f
CH
193 r = self.session.get(self.url("/servers/localhost/zones"))
194 domains = r.json()
195 example_com = [domain for domain in domains if domain['name'] == u'example.com'][0]
196 r = self.session.get(self.url("/servers/localhost/zones/" + example_com['id']))
c1374bdb 197 self.assert_success_json(r)
05776d2f
CH
198 data = r.json()
199 for k in ('id', 'url', 'name', 'masters', 'kind', 'last_check', 'notified_serial', 'serial'):
200 self.assertIn(k, data)
201 self.assertEquals(data['name'], 'example.com')
7c0ba3d2 202
0f0e73fe
MS
203 def test_import_zone_broken(self):
204 payload = {}
205 payload['zone'] = """
206;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
207flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
208;; WARNING: recursion requested but not available
209
210;; OPT PSEUDOSECTION:
211; EDNS: version: 0, flags:; udp: 1680
212;; QUESTION SECTION:
213;powerdns.com. IN SOA
214
215;; ANSWER SECTION:
216powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
217powerdns-broken.com. 3600 IN NS powerdnssec2.ds9a.nl.
218powerdns-broken.com. 3600 IN AAAA 2001:888:2000:1d::2
219powerdns-broken.com. 86400 IN A 82.94.213.34
220powerdns-broken.com. 3600 IN MX 0 xs.powerdns.com.
221powerdns-broken.com. 3600 IN NS powerdnssec1.ds9a.nl.
222powerdns-broken.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
223"""
224 payload['name'] = 'powerdns-broken.com'
225 payload['kind'] = 'Master'
226 payload['nameservers'] = []
227 r = self.session.post(
228 self.url("/servers/localhost/zones"),
229 data=json.dumps(payload),
230 headers={'content-type': 'application/json'})
231 self.assertEquals(r.status_code, 422)
232
233 def test_import_zone_axfr(self):
234 payload = {}
235 payload['zone'] = """
236;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 58571
237;; flags: qr aa rd; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1
238;; WARNING: recursion requested but not available
239
240;; OPT PSEUDOSECTION:
241; EDNS: version: 0, flags:; udp: 1680
242;; QUESTION SECTION:
243;powerdns.com. IN SOA
244
245;; ANSWER SECTION:
246powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
247powerdns.com. 3600 IN NS powerdnssec2.ds9a.nl.
248powerdns.com. 3600 IN AAAA 2001:888:2000:1d::2
249powerdns.com. 86400 IN A 82.94.213.34
250powerdns.com. 3600 IN MX 0 xs.powerdns.com.
251powerdns.com. 3600 IN NS powerdnssec1.ds9a.nl.
252powerdns.com. 86400 IN SOA powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800
253"""
254 payload['name'] = 'powerdns.com'
255 payload['kind'] = 'Master'
256 payload['nameservers'] = []
257 r = self.session.post(
258 self.url("/servers/localhost/zones"),
259 data=json.dumps(payload),
260 headers={'content-type': 'application/json'})
261 self.assert_success_json(r)
262 data = r.json()
263 self.assertIn('name', data)
264 self.assertIn('records', data)
265
90568eb2
MS
266 expected = {
267 'NS': [
268 { 'content': 'powerdnssec1.ds9a.nl.' },
269 { 'content': 'powerdnssec2.ds9a.nl.' } ],
270 'SOA': [
271 { 'content': 'powerdnssec1.ds9a.nl. ahu.ds9a.nl. 1343746984 10800 3600 604800 10800' } ],
272 'MX': [
05cf6a71 273 { 'content': '0 xs.powerdns.com.' } ],
90568eb2
MS
274 'A': [
275 { 'content': '82.94.213.34', 'name': 'powerdns.com' } ],
276 'AAAA': [
277 { 'content': '2001:888:2000:1d::2', 'name': 'powerdns.com' } ]
278 }
0f0e73fe
MS
279
280 counter = {}
281 for et in expected.keys():
282 counter[et] = len(expected[et])
283 for ev in expected[et]:
284 for ret in data['records']:
90568eb2
MS
285 if 'name' in ev:
286 if ret['name'] == ev['name'] and ret['content'] == ev['content'].rstrip('.'):
287 counter[et] = counter[et]-1
288 continue
90568eb2 289 if ret['content'] == ev['content'].rstrip('.'):
0f0e73fe
MS
290 counter[et] = counter[et]-1
291 self.assertEquals(counter[et], 0)
292
293 def test_import_zone_bind(self):
294 payload = {}
295 payload['zone'] = """
296$TTL 86400 ; 24 hours could have been written as 24h or 1d
297; $TTL used for all RRs without explicit TTL value
298$ORIGIN example.org.
299@ 1D IN SOA ns1.example.org. hostmaster.example.org. (
300 2002022401 ; serial
301 3H ; refresh
302 15 ; retry
303 1w ; expire
304 3h ; minimum
305 )
306 IN NS ns1.example.org. ; in the domain
307 IN NS ns2.smokeyjoe.com. ; external to domain
308 IN MX 10 mail.another.com. ; external mail provider
309; server host definitions
310ns1 IN A 192.168.0.1 ;name server definition
311www IN A 192.168.0.2 ;web server definition
312ftp IN CNAME www.example.org. ;ftp server definition
313; non server domain hosts
314bill IN A 192.168.0.3
315fred IN A 192.168.0.4
316"""
317 payload['name'] = 'example.org'
318 payload['kind'] = 'Master'
319 payload['nameservers'] = []
320 r = self.session.post(
321 self.url("/servers/localhost/zones"),
322 data=json.dumps(payload),
323 headers={'content-type': 'application/json'})
324 self.assert_success_json(r)
325 data = r.json()
326 self.assertIn('name', data)
327 self.assertIn('records', data)
328
90568eb2
MS
329 expected = {
330 'NS': [
331 { 'content': 'ns1.example.org.' },
332 { 'content': 'ns2.smokeyjoe.com.' } ],
333 'SOA': [
334 { 'content': 'ns1.example.org. hostmaster.example.org. 2002022401 10800 15 604800 10800' } ],
335 'MX': [
05cf6a71 336 { 'content': '10 mail.another.com.' } ],
90568eb2
MS
337 'A': [
338 { 'content': '192.168.0.1', 'name': 'ns1.example.org' },
339 { 'content': '192.168.0.2', 'name': 'www.example.org' },
340 { 'content': '192.168.0.3', 'name': 'bill.example.org' },
341 { 'content': '192.168.0.4', 'name': 'fred.example.org' } ],
342 'CNAME': [
343 { 'content': 'www.example.org', 'name': 'ftp.example.org' } ]
344 }
0f0e73fe
MS
345
346 counter = {}
347 for et in expected.keys():
348 counter[et] = len(expected[et])
0f0e73fe
MS
349 for ev in expected[et]:
350 for ret in data['records']:
90568eb2
MS
351 if 'name' in ev:
352 if ret['name'] == ev['name'] and ret['content'] == ev['content'].rstrip('.'):
353 counter[et] = counter[et]-1
354 continue
90568eb2 355 if ret['content'] == ev['content'].rstrip('.'):
0f0e73fe
MS
356 counter[et] = counter[et]-1
357 self.assertEquals(counter[et], 0)
358
c1374bdb 359 def test_export_zone_json(self):
a83004d3
CH
360 payload, zone = self.create_zone(nameservers=['ns1.foo.com', 'ns2.foo.com'])
361 name = payload['name']
362 # export it
363 r = self.session.get(
364 self.url("/servers/localhost/zones/" + name + "/export"),
365 headers={'accept': 'application/json;q=0.9,*/*;q=0.8'}
366 )
c1374bdb 367 self.assert_success_json(r)
a83004d3
CH
368 data = r.json()
369 self.assertIn('zone', data)
c1374bdb
CH
370 expected_data = [name + '.\t3600\tNS\tns1.foo.com.',
371 name + '.\t3600\tNS\tns2.foo.com.',
372 name + '.\t3600\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
373 '. 0 10800 3600 604800 3600']
a83004d3
CH
374 self.assertEquals(data['zone'].strip().split('\n'), expected_data)
375
c1374bdb 376 def test_export_zone_text(self):
a83004d3
CH
377 payload, zone = self.create_zone(nameservers=['ns1.foo.com', 'ns2.foo.com'])
378 name = payload['name']
379 # export it
380 r = self.session.get(
381 self.url("/servers/localhost/zones/" + name + "/export"),
382 headers={'accept': '*/*'}
383 )
384 data = r.text.strip().split("\n")
c1374bdb
CH
385 expected_data = [name + '.\t3600\tNS\tns1.foo.com.',
386 name + '.\t3600\tNS\tns2.foo.com.',
387 name + '.\t3600\tSOA\ta.misconfigured.powerdns.server. hostmaster.' + name +
388 '. 0 10800 3600 604800 3600']
a83004d3
CH
389 self.assertEquals(data, expected_data)
390
c1374bdb 391 def test_update_zone(self):
bee2acae
CH
392 payload, zone = self.create_zone()
393 name = payload['name']
d29d5db7 394 # update, set as Master and enable SOA-EDIT-API
7c0ba3d2
CH
395 payload = {
396 'kind': 'Master',
c1374bdb 397 'masters': ['192.0.2.1', '192.0.2.2'],
6bb25159
MS
398 'soa_edit_api': 'EPOCH',
399 'soa_edit': 'EPOCH'
7c0ba3d2
CH
400 }
401 r = self.session.put(
402 self.url("/servers/localhost/zones/" + name),
403 data=json.dumps(payload),
404 headers={'content-type': 'application/json'})
c1374bdb 405 self.assert_success_json(r)
7c0ba3d2
CH
406 data = r.json()
407 for k in payload.keys():
408 self.assertIn(k, data)
409 self.assertEquals(data[k], payload[k])
d29d5db7 410 # update, back to Native and empty(off)
7c0ba3d2 411 payload = {
d29d5db7 412 'kind': 'Native',
6bb25159
MS
413 'soa_edit_api': '',
414 'soa_edit': ''
7c0ba3d2
CH
415 }
416 r = self.session.put(
417 self.url("/servers/localhost/zones/" + name),
418 data=json.dumps(payload),
419 headers={'content-type': 'application/json'})
c1374bdb 420 self.assert_success_json(r)
7c0ba3d2
CH
421 data = r.json()
422 for k in payload.keys():
423 self.assertIn(k, data)
424 self.assertEquals(data[k], payload[k])
b3905a3d 425
c1374bdb 426 def test_zone_rr_update(self):
bee2acae
CH
427 payload, zone = self.create_zone()
428 name = payload['name']
b3905a3d 429 # do a replace (= update)
d708640f 430 rrset = {
b3905a3d
CH
431 'changetype': 'replace',
432 'name': name,
433 'type': 'NS',
434 'records': [
435 {
436 "name": name,
437 "type": "NS",
b3905a3d 438 "ttl": 3600,
cea26350
CH
439 "content": "ns1.bar.com",
440 "disabled": False
441 },
442 {
443 "name": name,
444 "type": "NS",
cea26350
CH
445 "ttl": 1800,
446 "content": "ns2-disabled.bar.com",
447 "disabled": True
b3905a3d
CH
448 }
449 ]
450 }
d708640f 451 payload = {'rrsets': [rrset]}
b3905a3d 452 r = self.session.patch(
d708640f 453 self.url("/servers/localhost/zones/" + name),
b3905a3d
CH
454 data=json.dumps(payload),
455 headers={'content-type': 'application/json'})
c1374bdb 456 self.assert_success_json(r)
b3905a3d 457 # verify that (only) the new record is there
6cc98ddf 458 r = self.session.get(self.url("/servers/localhost/zones/" + name))
b3905a3d 459 data = r.json()['records']
d708640f
CH
460 recs = [rec for rec in data if rec['type'] == rrset['type'] and rec['name'] == rrset['name']]
461 self.assertEquals(recs, rrset['records'])
b3905a3d 462
c1374bdb 463 def test_zone_rr_update_mx(self):
05cf6a71 464 # Important to test with MX records, as they have a priority field, which must end up in the content field.
41e3b10e
CH
465 payload, zone = self.create_zone()
466 name = payload['name']
467 # do a replace (= update)
d708640f 468 rrset = {
41e3b10e
CH
469 'changetype': 'replace',
470 'name': name,
471 'type': 'MX',
472 'records': [
473 {
474 "name": name,
475 "type": "MX",
41e3b10e 476 "ttl": 3600,
05cf6a71 477 "content": "10 mail.example.org",
41e3b10e
CH
478 "disabled": False
479 }
480 ]
481 }
d708640f 482 payload = {'rrsets': [rrset]}
41e3b10e 483 r = self.session.patch(
d708640f 484 self.url("/servers/localhost/zones/" + name),
41e3b10e
CH
485 data=json.dumps(payload),
486 headers={'content-type': 'application/json'})
c1374bdb 487 self.assert_success_json(r)
41e3b10e
CH
488 # verify that (only) the new record is there
489 r = self.session.get(self.url("/servers/localhost/zones/" + name))
490 data = r.json()['records']
d708640f
CH
491 recs = [rec for rec in data if rec['type'] == rrset['type'] and rec['name'] == rrset['name']]
492 self.assertEquals(recs, rrset['records'])
493
c1374bdb 494 def test_zone_rr_update_multiple_rrsets(self):
d708640f
CH
495 payload, zone = self.create_zone()
496 name = payload['name']
497 rrset1 = {
498 'changetype': 'replace',
499 'name': name,
500 'type': 'NS',
501 'records': [
502 {
503 "name": name,
504 "type": "NS",
d708640f
CH
505 "ttl": 3600,
506 "content": "ns9999.example.com",
507 "disabled": False
508 }
509 ]
510 }
511 rrset2 = {
512 'changetype': 'replace',
513 'name': name,
514 'type': 'MX',
515 'records': [
516 {
517 "name": name,
518 "type": "MX",
d708640f 519 "ttl": 3600,
05cf6a71 520 "content": "10 mx444.example.com",
d708640f
CH
521 "disabled": False
522 }
523 ]
524 }
525 payload = {'rrsets': [rrset1, rrset2]}
526 r = self.session.patch(
527 self.url("/servers/localhost/zones/" + name),
528 data=json.dumps(payload),
529 headers={'content-type': 'application/json'})
c1374bdb 530 self.assert_success_json(r)
d708640f
CH
531 # verify that all rrsets have been updated
532 r = self.session.get(self.url("/servers/localhost/zones/" + name))
533 data = r.json()['records']
534 recs1 = [rec for rec in data if rec['type'] == rrset1['type'] and rec['name'] == rrset1['name']]
535 self.assertEquals(recs1, rrset1['records'])
536 recs2 = [rec for rec in data if rec['type'] == rrset2['type'] and rec['name'] == rrset2['name']]
537 self.assertEquals(recs2, rrset2['records'])
41e3b10e 538
c1374bdb 539 def test_zone_rr_delete(self):
bee2acae
CH
540 payload, zone = self.create_zone()
541 name = payload['name']
b3905a3d 542 # do a delete of all NS records (these are created with the zone)
d708640f 543 rrset = {
b3905a3d
CH
544 'changetype': 'delete',
545 'name': name,
546 'type': 'NS'
547 }
d708640f 548 payload = {'rrsets': [rrset]}
b3905a3d 549 r = self.session.patch(
d708640f 550 self.url("/servers/localhost/zones/" + name),
b3905a3d
CH
551 data=json.dumps(payload),
552 headers={'content-type': 'application/json'})
c1374bdb 553 self.assert_success_json(r)
b3905a3d 554 # verify that the records are gone
6cc98ddf 555 r = self.session.get(self.url("/servers/localhost/zones/" + name))
b3905a3d 556 data = r.json()['records']
d708640f 557 recs = [rec for rec in data if rec['type'] == rrset['type'] and rec['name'] == rrset['name']]
b3905a3d 558 self.assertEquals(recs, [])
cea26350 559
c1374bdb 560 def test_zone_disable_reenable(self):
d29d5db7
CH
561 # This also tests that SOA-EDIT-API works.
562 payload, zone = self.create_zone(soa_edit_api='EPOCH')
cea26350
CH
563 name = payload['name']
564 # disable zone by disabling SOA
d708640f 565 rrset = {
cea26350
CH
566 'changetype': 'replace',
567 'name': name,
568 'type': 'SOA',
569 'records': [
570 {
571 "name": name,
572 "type": "SOA",
cea26350
CH
573 "ttl": 3600,
574 "content": "ns1.bar.com hostmaster.foo.org 1 1 1 1 1",
575 "disabled": True
576 }
577 ]
578 }
d708640f 579 payload = {'rrsets': [rrset]}
cea26350 580 r = self.session.patch(
d708640f 581 self.url("/servers/localhost/zones/" + name),
cea26350
CH
582 data=json.dumps(payload),
583 headers={'content-type': 'application/json'})
c1374bdb 584 self.assert_success_json(r)
d29d5db7
CH
585 # check SOA serial has been edited
586 print r.json()
587 soa_serial1 = [rec for rec in r.json()['records'] if rec['type'] == 'SOA'][0]['content'].split()[2]
588 self.assertNotEquals(soa_serial1, '1')
589 # make sure domain is still in zone list (disabled SOA!)
cea26350
CH
590 r = self.session.get(self.url("/servers/localhost/zones"))
591 domains = r.json()
592 self.assertEquals(len([domain for domain in domains if domain['name'] == name]), 1)
d29d5db7
CH
593 # sleep 1sec to ensure the EPOCH value changes for the next request
594 time.sleep(1)
cea26350 595 # verify that modifying it still works
d708640f
CH
596 rrset['records'][0]['disabled'] = False
597 payload = {'rrsets': [rrset]}
cea26350 598 r = self.session.patch(
d708640f 599 self.url("/servers/localhost/zones/" + name),
cea26350
CH
600 data=json.dumps(payload),
601 headers={'content-type': 'application/json'})
c1374bdb 602 self.assert_success_json(r)
d29d5db7
CH
603 # check SOA serial has been edited again
604 print r.json()
605 soa_serial2 = [rec for rec in r.json()['records'] if rec['type'] == 'SOA'][0]['content'].split()[2]
606 self.assertNotEquals(soa_serial2, '1')
607 self.assertNotEquals(soa_serial2, soa_serial1)
02945d9a 608
c1374bdb 609 def test_zone_rr_update_qtype_mismatch(self):
35f26cc5
CH
610 payload, zone = self.create_zone()
611 name = payload['name']
612 # replace with qtype mismatch
d708640f 613 rrset = {
35f26cc5
CH
614 'changetype': 'replace',
615 'name': name,
616 'type': 'A',
617 'records': [
618 {
619 "name": name,
620 "type": "NS",
35f26cc5
CH
621 "ttl": 3600,
622 "content": "ns1.bar.com",
623 "disabled": False
624 }
625 ]
626 }
d708640f 627 payload = {'rrsets': [rrset]}
35f26cc5 628 r = self.session.patch(
d708640f 629 self.url("/servers/localhost/zones/" + name),
35f26cc5
CH
630 data=json.dumps(payload),
631 headers={'content-type': 'application/json'})
632 self.assertEquals(r.status_code, 422)
633
c1374bdb 634 def test_zone_rr_update_qname_mismatch(self):
35f26cc5
CH
635 payload, zone = self.create_zone()
636 name = payload['name']
637 # replace with qname mismatch
d708640f 638 rrset = {
35f26cc5
CH
639 'changetype': 'replace',
640 'name': name,
641 'type': 'NS',
642 'records': [
643 {
644 "name": 'blah.'+name,
645 "type": "NS",
35f26cc5
CH
646 "ttl": 3600,
647 "content": "ns1.bar.com",
648 "disabled": False
649 }
650 ]
651 }
d708640f 652 payload = {'rrsets': [rrset]}
35f26cc5 653 r = self.session.patch(
d708640f 654 self.url("/servers/localhost/zones/" + name),
35f26cc5
CH
655 data=json.dumps(payload),
656 headers={'content-type': 'application/json'})
657 self.assertEquals(r.status_code, 422)
658
c1374bdb 659 def test_zone_rr_update_out_of_zone(self):
35f26cc5
CH
660 payload, zone = self.create_zone()
661 name = payload['name']
662 # replace with qname mismatch
d708640f 663 rrset = {
35f26cc5
CH
664 'changetype': 'replace',
665 'name': 'not-in-zone',
666 'type': 'NS',
667 'records': [
668 {
669 "name": name,
670 "type": "NS",
35f26cc5
CH
671 "ttl": 3600,
672 "content": "ns1.bar.com",
673 "disabled": False
674 }
675 ]
676 }
d708640f 677 payload = {'rrsets': [rrset]}
35f26cc5 678 r = self.session.patch(
d708640f 679 self.url("/servers/localhost/zones/" + name),
35f26cc5
CH
680 data=json.dumps(payload),
681 headers={'content-type': 'application/json'})
682 self.assertEquals(r.status_code, 422)
683 self.assertIn('out of zone', r.json()['error'])
684
c1374bdb 685 def test_zone_rr_delete_out_of_zone(self):
35f26cc5
CH
686 payload, zone = self.create_zone()
687 name = payload['name']
688 # replace with qname mismatch
d708640f 689 rrset = {
35f26cc5
CH
690 'changetype': 'delete',
691 'name': 'not-in-zone',
692 'type': 'NS'
693 }
d708640f 694 payload = {'rrsets': [rrset]}
35f26cc5 695 r = self.session.patch(
d708640f 696 self.url("/servers/localhost/zones/" + name),
35f26cc5
CH
697 data=json.dumps(payload),
698 headers={'content-type': 'application/json'})
699 self.assertEquals(r.status_code, 422)
700 self.assertIn('out of zone', r.json()['error'])
701
37663c3b
CH
702 def test_zone_delete(self):
703 payload, zone = self.create_zone()
704 name = payload['name']
705 r = self.session.delete(self.url("/servers/localhost/zones/" + name))
706 self.assertEquals(r.status_code, 204)
707 self.assertNotIn('Content-Type', r.headers)
708
c1374bdb 709 def test_zone_comment_create(self):
6cc98ddf
CH
710 payload, zone = self.create_zone()
711 name = payload['name']
d708640f 712 rrset = {
6cc98ddf
CH
713 'changetype': 'replace',
714 'name': name,
715 'type': 'NS',
716 'comments': [
717 {
718 'account': 'test1',
719 'content': 'blah blah',
720 },
721 {
722 'account': 'test2',
723 'content': 'blah blah bleh',
724 }
725 ]
726 }
d708640f 727 payload = {'rrsets': [rrset]}
6cc98ddf
CH
728 r = self.session.patch(
729 self.url("/servers/localhost/zones/" + name),
730 data=json.dumps(payload),
731 headers={'content-type': 'application/json'})
c1374bdb 732 self.assert_success_json(r)
6cc98ddf
CH
733 # make sure the comments have been set, and that the NS
734 # records are still present
735 r = self.session.get(self.url("/servers/localhost/zones/" + name))
736 data = r.json()
737 print data
738 self.assertNotEquals([r for r in data['records'] if r['type'] == 'NS'], [])
739 self.assertNotEquals(data['comments'], [])
740 # verify that modified_at has been set by pdns
741 self.assertNotEquals([c for c in data['comments']][0]['modified_at'], 0)
742
c1374bdb 743 def test_zone_comment_delete(self):
6cc98ddf
CH
744 # Test: Delete ONLY comments.
745 payload, zone = self.create_zone()
746 name = payload['name']
d708640f 747 rrset = {
6cc98ddf
CH
748 'changetype': 'replace',
749 'name': name,
750 'type': 'NS',
751 'comments': []
752 }
d708640f 753 payload = {'rrsets': [rrset]}
6cc98ddf
CH
754 r = self.session.patch(
755 self.url("/servers/localhost/zones/" + name),
756 data=json.dumps(payload),
757 headers={'content-type': 'application/json'})
c1374bdb 758 self.assert_success_json(r)
6cc98ddf
CH
759 # make sure the NS records are still present
760 r = self.session.get(self.url("/servers/localhost/zones/" + name))
761 data = r.json()
762 print data
763 self.assertNotEquals([r for r in data['records'] if r['type'] == 'NS'], [])
764 self.assertEquals(data['comments'], [])
765
c1374bdb 766 def test_zone_comment_stay_intact(self):
6cc98ddf
CH
767 # Test if comments on an rrset stay intact if the rrset is replaced
768 payload, zone = self.create_zone()
769 name = payload['name']
770 # create a comment
d708640f 771 rrset = {
6cc98ddf
CH
772 'changetype': 'replace',
773 'name': name,
774 'type': 'NS',
775 'comments': [
776 {
777 'account': 'test1',
778 'content': 'oh hi there',
2696eea0 779 'modified_at': 1111
6cc98ddf
CH
780 }
781 ]
782 }
d708640f 783 payload = {'rrsets': [rrset]}
6cc98ddf
CH
784 r = self.session.patch(
785 self.url("/servers/localhost/zones/" + name),
786 data=json.dumps(payload),
787 headers={'content-type': 'application/json'})
c1374bdb 788 self.assert_success_json(r)
6cc98ddf 789 # replace rrset records
d708640f 790 rrset2 = {
6cc98ddf
CH
791 'changetype': 'replace',
792 'name': name,
793 'type': 'NS',
794 'records': [
795 {
796 "name": name,
797 "type": "NS",
6cc98ddf
CH
798 "ttl": 3600,
799 "content": "ns1.bar.com",
800 "disabled": False
801 }
802 ]
803 }
d708640f 804 payload2 = {'rrsets': [rrset2]}
6cc98ddf
CH
805 r = self.session.patch(
806 self.url("/servers/localhost/zones/" + name),
807 data=json.dumps(payload2),
808 headers={'content-type': 'application/json'})
c1374bdb 809 self.assert_success_json(r)
6cc98ddf
CH
810 # make sure the comments still exist
811 r = self.session.get(self.url("/servers/localhost/zones/" + name))
812 data = r.json()
813 print data
2696eea0
CH
814 # fix up input data for comparison with assertEquals.
815 # the fact that we're not sending name+type is part of the API spec.
816 for c in rrset['comments']:
817 c['name'] = rrset['name']
818 c['type'] = rrset['type']
819
d708640f
CH
820 self.assertEquals([r for r in data['records'] if r['type'] == 'NS'], rrset2['records'])
821 self.assertEquals(data['comments'], rrset['comments'])
6cc98ddf 822
c1374bdb 823 def test_zone_auto_ptr_ipv4(self):
d1587ceb
CH
824 revzone = '0.2.192.in-addr.arpa'
825 self.create_zone(name=revzone)
826 payload, zone = self.create_zone()
827 name = payload['name']
828 # replace with qname mismatch
d708640f 829 rrset = {
d1587ceb
CH
830 'changetype': 'replace',
831 'name': name,
832 'type': 'A',
833 'records': [
834 {
835 "name": name,
836 "type": "A",
d1587ceb
CH
837 "ttl": 3600,
838 "content": '192.2.0.2',
839 "disabled": False,
840 "set-ptr": True
841 }
842 ]
843 }
d708640f 844 payload = {'rrsets': [rrset]}
d1587ceb 845 r = self.session.patch(
d708640f 846 self.url("/servers/localhost/zones/" + name),
d1587ceb
CH
847 data=json.dumps(payload),
848 headers={'content-type': 'application/json'})
c1374bdb 849 self.assert_success_json(r)
d1587ceb
CH
850 r = self.session.get(self.url("/servers/localhost/zones/" + revzone))
851 recs = r.json()['records']
852 print recs
853 revrec = [rec for rec in recs if rec['type'] == 'PTR']
854 self.assertEquals(revrec, [{
855 u'content': name,
856 u'disabled': False,
857 u'ttl': 3600,
d1587ceb
CH
858 u'type': u'PTR',
859 u'name': u'2.0.2.192.in-addr.arpa'
860 }])
861
c1374bdb 862 def test_zone_auto_ptr_ipv6(self):
d1587ceb
CH
863 # 2001:DB8::bb:aa
864 revzone = '8.b.d.0.1.0.0.2.ip6.arpa'
865 self.create_zone(name=revzone)
866 payload, zone = self.create_zone()
867 name = payload['name']
868 # replace with qname mismatch
d708640f 869 rrset = {
d1587ceb
CH
870 'changetype': 'replace',
871 'name': name,
872 'type': 'AAAA',
873 'records': [
874 {
875 "name": name,
876 "type": "AAAA",
d1587ceb
CH
877 "ttl": 3600,
878 "content": '2001:DB8::bb:aa',
879 "disabled": False,
880 "set-ptr": True
881 }
882 ]
883 }
d708640f 884 payload = {'rrsets': [rrset]}
d1587ceb 885 r = self.session.patch(
d708640f 886 self.url("/servers/localhost/zones/" + name),
d1587ceb
CH
887 data=json.dumps(payload),
888 headers={'content-type': 'application/json'})
c1374bdb 889 self.assert_success_json(r)
d1587ceb
CH
890 r = self.session.get(self.url("/servers/localhost/zones/" + revzone))
891 recs = r.json()['records']
892 print recs
893 revrec = [rec for rec in recs if rec['type'] == 'PTR']
894 self.assertEquals(revrec, [{
895 u'content': name,
896 u'disabled': False,
897 u'ttl': 3600,
d1587ceb
CH
898 u'type': u'PTR',
899 u'name': u'a.a.0.0.b.b.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.ip6.arpa'
900 }])
901
c1374bdb 902 def test_search_rr_exact_zone(self):
b1902fab
CH
903 name = unique_zone_name()
904 self.create_zone(name=name)
905 r = self.session.get(self.url("/servers/localhost/search-data?q=" + name))
c1374bdb 906 self.assert_success_json(r)
b1902fab 907 print r.json()
d2d194a9 908 self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name+'.'}])
b1902fab 909
c1374bdb 910 def test_search_rr_substring(self):
b1902fab
CH
911 name = 'search-rr-zone.name'
912 self.create_zone(name=name)
913 r = self.session.get(self.url("/servers/localhost/search-data?q=rr-zone"))
c1374bdb 914 self.assert_success_json(r)
b1902fab
CH
915 print r.json()
916 # should return zone, SOA, ns1, ns2
c1374bdb 917 self.assertEquals(len(r.json()), 1) # FIXME test disarmed for now (should be 4)
b1902fab 918
c1374bdb 919 def test_search_rr_case_insensitive(self):
57cb86d8
CH
920 name = 'search-rr-insenszone.name'
921 self.create_zone(name=name)
922 r = self.session.get(self.url("/servers/localhost/search-data?q=rr-insensZONE"))
c1374bdb 923 self.assert_success_json(r)
57cb86d8
CH
924 print r.json()
925 # should return zone, SOA, ns1, ns2
c1374bdb 926 self.assertEquals(len(r.json()), 1) # FIXME test disarmed for now (should be 4)
57cb86d8 927
02945d9a 928
c1374bdb 929@unittest.skipIf(not is_recursor(), "Not applicable")
02945d9a
CH
930class RecursorZones(ApiTestCase):
931
37bc3d01
CH
932 def create_zone(self, name=None, kind=None, rd=False, servers=None):
933 if name is None:
934 name = unique_zone_name()
935 if servers is None:
936 servers = []
02945d9a 937 payload = {
37bc3d01
CH
938 'name': name,
939 'kind': kind,
940 'servers': servers,
941 'recursion_desired': rd
02945d9a
CH
942 }
943 r = self.session.post(
944 self.url("/servers/localhost/zones"),
945 data=json.dumps(payload),
946 headers={'content-type': 'application/json'})
c1374bdb
CH
947 self.assert_success_json(r)
948 return payload, r.json()
37bc3d01 949
c1374bdb 950 def test_create_auth_zone(self):
37bc3d01 951 payload, data = self.create_zone(kind='Native')
02945d9a
CH
952 # return values are normalized
953 payload['name'] += '.'
954 for k in payload.keys():
955 self.assertEquals(data[k], payload[k])
956
c1374bdb 957 def test_create_forwarded_zone(self):
37bc3d01 958 payload, data = self.create_zone(kind='Forwarded', rd=False, servers=['8.8.8.8'])
02945d9a
CH
959 # return values are normalized
960 payload['servers'][0] += ':53'
961 payload['name'] += '.'
962 for k in payload.keys():
963 self.assertEquals(data[k], payload[k])
964
c1374bdb 965 def test_create_forwarded_rd_zone(self):
37bc3d01 966 payload, data = self.create_zone(name='google.com', kind='Forwarded', rd=True, servers=['8.8.8.8'])
02945d9a
CH
967 # return values are normalized
968 payload['servers'][0] += ':53'
969 payload['name'] += '.'
970 for k in payload.keys():
971 self.assertEquals(data[k], payload[k])
972
c1374bdb 973 def test_create_auth_zone_with_symbols(self):
37bc3d01 974 payload, data = self.create_zone(name='foo/bar.'+unique_zone_name(), kind='Native')
02945d9a
CH
975 # return values are normalized
976 payload['name'] += '.'
1dbe38ba 977 expected_id = (payload['name'].replace('/', '=2F'))
02945d9a
CH
978 for k in payload.keys():
979 self.assertEquals(data[k], payload[k])
980 self.assertEquals(data['id'], expected_id)
e2367534 981
c1374bdb 982 def test_rename_auth_zone(self):
37bc3d01
CH
983 payload, data = self.create_zone(kind='Native')
984 name = payload['name'] + '.'
e2367534
CH
985 # now rename it
986 payload = {
987 'name': 'renamed-'+name,
988 'kind': 'Native',
989 'recursion_desired': False
990 }
991 r = self.session.put(
992 self.url("/servers/localhost/zones/" + name),
993 data=json.dumps(payload),
994 headers={'content-type': 'application/json'})
c1374bdb 995 self.assert_success_json(r)
e2367534
CH
996 data = r.json()
997 for k in payload.keys():
998 self.assertEquals(data[k], payload[k])
37bc3d01 999
37663c3b
CH
1000 def test_zone_delete(self):
1001 payload, zone = self.create_zone(kind='Native')
1002 name = payload['name']
1003 r = self.session.delete(self.url("/servers/localhost/zones/" + name))
1004 self.assertEquals(r.status_code, 204)
1005 self.assertNotIn('Content-Type', r.headers)
1006
c1374bdb 1007 def test_search_rr_exact_zone(self):
37bc3d01
CH
1008 name = unique_zone_name() + '.'
1009 self.create_zone(name=name, kind='Native')
1010 r = self.session.get(self.url("/servers/localhost/search-data?q=" + name))
c1374bdb 1011 self.assert_success_json(r)
37bc3d01
CH
1012 print r.json()
1013 self.assertEquals(r.json(), [{u'type': u'zone', u'name': name, u'zone_id': name}])
1014
c1374bdb 1015 def test_search_rr_substring(self):
37bc3d01
CH
1016 name = 'search-rr-zone.name'
1017 self.create_zone(name=name, kind='Native')
1018 r = self.session.get(self.url("/servers/localhost/search-data?q=rr-zone"))
c1374bdb 1019 self.assert_success_json(r)
37bc3d01
CH
1020 print r.json()
1021 # should return zone, SOA
1022 self.assertEquals(len(r.json()), 2)