6 from test_helper
import ApiTestCase
, is_auth
, is_recursor
, is_auth_lmdb
9 class Servers(ApiTestCase
):
11 def test_list_servers(self
):
12 r
= self
.session
.get(self
.url("/api/v1/servers"))
13 self
.assert_success_json(r
)
15 self
.assertEqual(len(lst
), 1) # only localhost allowed in there
17 for k
in ('id', 'daemon_type', 'url'):
18 self
.assertIn(k
, data
)
19 self
.assertEqual(data
['id'], 'localhost')
21 def test_servers_localhost(self
):
22 r
= self
.session
.get(self
.url("/api/v1/servers/localhost"))
23 self
.assert_success_json(r
)
25 for k
in ('id', 'type', 'version', 'daemon_type', 'url', 'zones_url', 'config_url'):
26 self
.assertIn(k
, data
)
27 self
.assertEqual(data
['id'], 'localhost')
28 self
.assertEqual(data
['type'], 'Server')
29 # or 'recursor' for recursors
31 daemon_type
= 'authoritative'
33 daemon_type
= 'recursor'
35 raise RuntimeError('Unknown daemon type')
36 self
.assertEqual(data
['daemon_type'], daemon_type
)
38 def test_read_config(self
):
39 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/config"))
40 self
.assert_success_json(r
)
41 data
= dict([(r
['name'], r
['value']) for r
in r
.json()])
42 self
.assertIn('daemon', data
)
44 def test_read_statistics(self
):
45 # Use low-level API as we want to create an invalid request to test log line encoding
46 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
);
47 sock
.connect((self
.server_address
, self
.server_port
))
48 sock
.send(b
'GET /binary\x00\x01\xeb HTTP/1.0\r\n')
50 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/statistics"))
51 self
.assert_success_json(r
)
53 self
.assertIn('uptime', [e
['name'] for e
in data
])
56 qtype_stats
, respsize_stats
, queries_stats
, rcode_stats
, logmessages
= None, None, None, None, None
58 if elem
['type'] == 'MapStatisticItem' and elem
['name'] == 'response-by-qtype':
59 qtype_stats
= elem
['value']
60 elif elem
['type'] == 'MapStatisticItem' and elem
['name'] == 'response-sizes':
61 respsize_stats
= elem
['value']
62 elif elem
['type'] == 'RingStatisticItem' and elem
['name'] == 'queries':
63 queries_stats
= elem
['value']
64 elif elem
['type'] == 'MapStatisticItem' and elem
['name'] == 'response-by-rcode':
65 rcode_stats
= elem
['value']
66 elif elem
['type'] == 'RingStatisticItem' and elem
['name'] == 'logmessages':
67 logmessages
= elem
['value']
68 self
.assertIn('A', [e
['name'] for e
in qtype_stats
])
69 self
.assertIn('80', [e
['name'] for e
in respsize_stats
])
70 self
.assertIn('example.com/A', [e
['name'] for e
in queries_stats
])
71 self
.assertIn('No Error', [e
['name'] for e
in rcode_stats
])
72 self
.assertTrue(logmessages
[0]['name'].startswith('[webserver]'))
74 qtype_stats
, respsize_stats
, rcode_stats
= None, None, None
76 if elem
['type'] == 'MapStatisticItem' and elem
['name'] == 'response-by-qtype':
77 qtype_stats
= elem
['value']
78 elif elem
['type'] == 'MapStatisticItem' and elem
['name'] == 'response-sizes':
79 respsize_stats
= elem
['value']
80 elif elem
['type'] == 'MapStatisticItem' and elem
['name'] == 'response-by-rcode':
81 rcode_stats
= elem
['value']
82 self
.assertIn('A', [e
['name'] for e
in qtype_stats
])
83 self
.assertIn('60', [e
['name'] for e
in respsize_stats
])
84 self
.assertIn('80', [e
['name'] for e
in respsize_stats
])
85 self
.assertIn('No Error', [e
['name'] for e
in rcode_stats
])
87 def test_read_one_statistic(self
):
88 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/statistics?statistic=uptime"))
89 self
.assert_success_json(r
)
91 self
.assertIn('uptime', [e
['name'] for e
in data
])
93 def test_read_one_non_existent_statistic(self
):
94 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/statistics?statistic=uptimeAAAA"))
95 self
.assertEqual(r
.status_code
, 422)
96 self
.assertIn("Unknown statistic name", r
.json()['error'])
98 def test_read_metrics(self
):
100 res
= self
.session
.get(self
.url("/metrics"), auth
=('whatever', self
.webServerBasicAuthPassword
), timeout
=2.0)
101 self
.assertEqual(res
.status_code
, 200)
104 for line
in res
.text
.splitlines():
107 if line
.split(" ")[0] == "pdns_recursor_uptime":
109 self
.assertTrue(found
,"pdns_recursor_uptime is missing")
111 @unittest.skipIf(is_auth(), "Not applicable")
112 def test_read_statistics_using_password(self
):
113 r
= requests
.get(self
.url("/api/v1/servers/localhost/statistics"), auth
=('admin', self
.server_web_password
))
114 self
.assertEqual(r
.status_code
, requests
.codes
.ok
)
115 self
.assert_success_json(r
)
117 @unittest.skipIf(is_recursor(), "Not applicable")
118 @unittest.skipIf(is_auth_lmdb(), "No autoprimary management in LMDB yet")
119 def test_autoprimaries(self
):
120 # verify that we have zero autoprimaries
121 res
= self
.session
.get(self
.url("/api/v1/servers/localhost/autoprimaries"), auth
=('whatever', self
.webServerBasicAuthPassword
), timeout
=2.0)
122 self
.assertEqual(res
.status_code
, requests
.codes
.ok
)
123 self
.assertEqual(res
.json(), [])
128 'nameserver': 'ns.example.com'
131 res
= self
.session
.post(
132 self
.url("/api/v1/servers/localhost/autoprimaries"),
133 data
=json
.dumps(payload
),
134 headers
={'content-type': 'application/json'})
136 self
.assertEqual(res
.status_code
, 201)
138 # check that it's there
139 res
= self
.session
.get(self
.url("/api/v1/servers/localhost/autoprimaries"), auth
=('whatever', self
.webServerBasicAuthPassword
), timeout
=2.0)
140 self
.assertEqual(res
.status_code
, requests
.codes
.ok
)
141 self
.assertEqual(res
.json(), [{'account': '', 'ip': '192.0.2.1', 'nameserver': 'ns.example.com'}])
143 # add another one, this time with an account field
146 'nameserver': 'ns.example.org',
150 res
= self
.session
.post(
151 self
.url("/api/v1/servers/localhost/autoprimaries"),
152 data
=json
.dumps(payload
),
153 headers
={'content-type': 'application/json'})
155 self
.assertEqual(res
.status_code
, 201)
157 # check that both are there
158 res
= self
.session
.get(self
.url("/api/v1/servers/localhost/autoprimaries"), auth
=('whatever', self
.webServerBasicAuthPassword
), timeout
=2.0)
159 self
.assertEqual(res
.status_code
, requests
.codes
.ok
)
160 self
.assertEqual(len(res
.json()), 2)
161 self
.assertEqual(sorted(res
.json(), key
=operator
.itemgetter('ip')), [
162 {'account': '', 'ip': '192.0.2.1', 'nameserver': 'ns.example.com'},
163 {'account': 'test', 'ip': '192.0.2.2', 'nameserver': 'ns.example.org'}
167 res
= self
.session
.delete(
168 self
.url("/api/v1/servers/localhost/autoprimaries/192.0.2.2/ns.example.org"),
169 headers
={'content-type': 'application/json"'})
171 self
.assertEqual(res
.status_code
, 204)
173 # check that we are back to just one
174 res
= self
.session
.get(self
.url("/api/v1/servers/localhost/autoprimaries"), auth
=('whatever', self
.webServerBasicAuthPassword
), timeout
=2.0)
175 self
.assertEqual(res
.status_code
, requests
.codes
.ok
)
176 self
.assertEqual(res
.json(), [{'account': '', 'ip': '192.0.2.1', 'nameserver': 'ns.example.com'}])