]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.api/test_Servers.py
Merge pull request #12100 from rgacogne/ddist-single-soa-ixfr
[thirdparty/pdns.git] / regression-tests.api / test_Servers.py
1 import json
2 import operator
3 import requests
4 import unittest
5 import socket
6 from test_helper import ApiTestCase, is_auth, is_recursor, is_auth_lmdb
7
8
9 class Servers(ApiTestCase):
10
11 def test_list_servers(self):
12 r = self.session.get(self.url("/api/v1/servers"))
13 self.assert_success_json(r)
14 lst = r.json()
15 self.assertEqual(len(lst), 1) # only localhost allowed in there
16 data = lst[0]
17 for k in ('id', 'daemon_type', 'url'):
18 self.assertIn(k, data)
19 self.assertEqual(data['id'], 'localhost')
20
21 def test_servers_localhost(self):
22 r = self.session.get(self.url("/api/v1/servers/localhost"))
23 self.assert_success_json(r)
24 data = r.json()
25 for k in ('id', 'type', 'version', 'daemon_type', 'url', 'zones_url', 'config_url'):
26 self.assertIn(k, data)
27 self.assertEqual(data['id'], 'localhost')
28 self.assertEqual(data['type'], 'Server')
29 # or 'recursor' for recursors
30 if is_auth():
31 daemon_type = 'authoritative'
32 elif is_recursor():
33 daemon_type = 'recursor'
34 else:
35 raise RuntimeError('Unknown daemon type')
36 self.assertEqual(data['daemon_type'], daemon_type)
37
38 def test_read_config(self):
39 r = self.session.get(self.url("/api/v1/servers/localhost/config"))
40 self.assert_success_json(r)
41 data = dict([(r['name'], r['value']) for r in r.json()])
42 self.assertIn('daemon', data)
43
44 def test_read_statistics(self):
45 # Use low-level API as we want to create an invalid request to test log line encoding
46 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM);
47 sock.connect((self.server_address, self.server_port))
48 sock.send(b'GET /binary\x00\x01\xeb HTTP/1.0\r\n')
49 sock.close()
50 r = self.session.get(self.url("/api/v1/servers/localhost/statistics"))
51 self.assert_success_json(r)
52 data = r.json()
53 self.assertIn('uptime', [e['name'] for e in data])
54 print(data)
55 if is_auth():
56 qtype_stats, respsize_stats, queries_stats, rcode_stats, logmessages = None, None, None, None, None
57 for elem in data:
58 if elem['type'] == 'MapStatisticItem' and elem['name'] == 'response-by-qtype':
59 qtype_stats = elem['value']
60 elif elem['type'] == 'MapStatisticItem' and elem['name'] == 'response-sizes':
61 respsize_stats = elem['value']
62 elif elem['type'] == 'RingStatisticItem' and elem['name'] == 'queries':
63 queries_stats = elem['value']
64 elif elem['type'] == 'MapStatisticItem' and elem['name'] == 'response-by-rcode':
65 rcode_stats = elem['value']
66 elif elem['type'] == 'RingStatisticItem' and elem['name'] == 'logmessages':
67 logmessages = elem['value']
68 self.assertIn('A', [e['name'] for e in qtype_stats])
69 self.assertIn('80', [e['name'] for e in respsize_stats])
70 self.assertIn('example.com/A', [e['name'] for e in queries_stats])
71 self.assertIn('No Error', [e['name'] for e in rcode_stats])
72 self.assertTrue(logmessages[0]['name'].startswith('[webserver]'))
73 else:
74 qtype_stats, respsize_stats, rcode_stats = None, None, None
75 for elem in data:
76 if elem['type'] == 'MapStatisticItem' and elem['name'] == 'response-by-qtype':
77 qtype_stats = elem['value']
78 elif elem['type'] == 'MapStatisticItem' and elem['name'] == 'response-sizes':
79 respsize_stats = elem['value']
80 elif elem['type'] == 'MapStatisticItem' and elem['name'] == 'response-by-rcode':
81 rcode_stats = elem['value']
82 self.assertIn('A', [e['name'] for e in qtype_stats])
83 self.assertIn('60', [e['name'] for e in respsize_stats])
84 self.assertIn('80', [e['name'] for e in respsize_stats])
85 self.assertIn('No Error', [e['name'] for e in rcode_stats])
86
87 def test_read_one_statistic(self):
88 r = self.session.get(self.url("/api/v1/servers/localhost/statistics?statistic=uptime"))
89 self.assert_success_json(r)
90 data = r.json()
91 self.assertIn('uptime', [e['name'] for e in data])
92
93 def test_read_one_non_existent_statistic(self):
94 r = self.session.get(self.url("/api/v1/servers/localhost/statistics?statistic=uptimeAAAA"))
95 self.assertEqual(r.status_code, 422)
96 self.assertIn("Unknown statistic name", r.json()['error'])
97
98 def test_read_metrics(self):
99 if is_recursor():
100 res = self.session.get(self.url("/metrics"), auth=('whatever', self.webServerBasicAuthPassword), timeout=2.0)
101 self.assertEqual(res.status_code, 200)
102 # print(res.text)
103 found = False
104 for line in res.text.splitlines():
105 if line[0] == "#":
106 continue
107 if line.split(" ")[0] == "pdns_recursor_uptime":
108 found = True
109 self.assertTrue(found,"pdns_recursor_uptime is missing")
110
111 @unittest.skipIf(is_auth(), "Not applicable")
112 def test_read_statistics_using_password(self):
113 r = requests.get(self.url("/api/v1/servers/localhost/statistics"), auth=('admin', self.server_web_password))
114 self.assertEqual(r.status_code, requests.codes.ok)
115 self.assert_success_json(r)
116
117 @unittest.skipIf(is_recursor(), "Not applicable")
118 @unittest.skipIf(is_auth_lmdb(), "No autoprimary management in LMDB yet")
119 def test_autoprimaries(self):
120 # verify that we have zero autoprimaries
121 res = self.session.get(self.url("/api/v1/servers/localhost/autoprimaries"), auth=('whatever', self.webServerBasicAuthPassword), timeout=2.0)
122 self.assertEqual(res.status_code, requests.codes.ok)
123 self.assertEqual(res.json(), [])
124
125 # add one
126 payload = {
127 'ip': '192.0.2.1',
128 'nameserver': 'ns.example.com'
129 }
130
131 res = self.session.post(
132 self.url("/api/v1/servers/localhost/autoprimaries"),
133 data=json.dumps(payload),
134 headers={'content-type': 'application/json'})
135
136 self.assertEqual(res.status_code, 201)
137
138 # check that it's there
139 res = self.session.get(self.url("/api/v1/servers/localhost/autoprimaries"), auth=('whatever', self.webServerBasicAuthPassword), timeout=2.0)
140 self.assertEqual(res.status_code, requests.codes.ok)
141 self.assertEqual(res.json(), [{'account': '', 'ip': '192.0.2.1', 'nameserver': 'ns.example.com'}])
142
143 # add another one, this time with an account field
144 payload = {
145 'ip': '192.0.2.2',
146 'nameserver': 'ns.example.org',
147 'account': 'test'
148 }
149
150 res = self.session.post(
151 self.url("/api/v1/servers/localhost/autoprimaries"),
152 data=json.dumps(payload),
153 headers={'content-type': 'application/json'})
154
155 self.assertEqual(res.status_code, 201)
156
157 # check that both are there
158 res = self.session.get(self.url("/api/v1/servers/localhost/autoprimaries"), auth=('whatever', self.webServerBasicAuthPassword), timeout=2.0)
159 self.assertEqual(res.status_code, requests.codes.ok)
160 self.assertEqual(len(res.json()), 2)
161 self.assertEqual(sorted(res.json(), key=operator.itemgetter('ip')), [
162 {'account': '', 'ip': '192.0.2.1', 'nameserver': 'ns.example.com'},
163 {'account': 'test', 'ip': '192.0.2.2', 'nameserver': 'ns.example.org'}
164 ])
165
166 # remove one
167 res = self.session.delete(
168 self.url("/api/v1/servers/localhost/autoprimaries/192.0.2.2/ns.example.org"),
169 headers={'content-type': 'application/json"'})
170
171 self.assertEqual(res.status_code, 204)
172
173 # check that we are back to just one
174 res = self.session.get(self.url("/api/v1/servers/localhost/autoprimaries"), auth=('whatever', self.webServerBasicAuthPassword), timeout=2.0)
175 self.assertEqual(res.status_code, requests.codes.ok)
176 self.assertEqual(res.json(), [{'account': '', 'ip': '192.0.2.1', 'nameserver': 'ns.example.com'}])