3 from test_helper
import ApiTestCase
, is_auth
, is_recursor
6 class Servers(ApiTestCase
):
8 def test_list_servers(self
):
9 r
= self
.session
.get(self
.url("/api/v1/servers"))
10 self
.assert_success_json(r
)
12 self
.assertEquals(len(lst
), 1) # only localhost allowed in there
14 for k
in ('id', 'daemon_type', 'url'):
15 self
.assertIn(k
, data
)
16 self
.assertEquals(data
['id'], 'localhost')
18 def test_servers_localhost(self
):
19 r
= self
.session
.get(self
.url("/api/v1/servers/localhost"))
20 self
.assert_success_json(r
)
22 for k
in ('id', 'type', 'version', 'daemon_type', 'url', 'zones_url', 'config_url'):
23 self
.assertIn(k
, data
)
24 self
.assertEquals(data
['id'], 'localhost')
25 self
.assertEquals(data
['type'], 'Server')
26 # or 'recursor' for recursors
28 daemon_type
= 'authoritative'
30 daemon_type
= 'recursor'
32 raise RuntimeError('Unknown daemon type')
33 self
.assertEquals(data
['daemon_type'], daemon_type
)
35 def test_read_config(self
):
36 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/config"))
37 self
.assert_success_json(r
)
38 data
= dict([(r
['name'], r
['value']) for r
in r
.json()])
39 self
.assertIn('daemon', data
)
41 def test_read_statistics(self
):
42 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/statistics"))
43 self
.assert_success_json(r
)
45 self
.assertIn('uptime', [e
['name'] for e
in data
])
48 qtype_stats
, respsize_stats
, queries_stats
, rcode_stats
= None, None, None, None
50 if elem
['type'] == 'MapStatisticItem' and elem
['name'] == 'response-by-qtype':
51 qtype_stats
= elem
['value']
52 elif elem
['type'] == 'MapStatisticItem' and elem
['name'] == 'response-sizes':
53 respsize_stats
= elem
['value']
54 elif elem
['type'] == 'RingStatisticItem' and elem
['name'] == 'queries':
55 queries_stats
= elem
['value']
56 elif elem
['type'] == 'MapStatisticItem' and elem
['name'] == 'response-by-rcode':
57 rcode_stats
= elem
['value']
58 self
.assertIn('A', [e
['name'] for e
in qtype_stats
])
59 self
.assertIn('60', [e
['name'] for e
in respsize_stats
])
60 self
.assertIn('example.com/A', [e
['name'] for e
in queries_stats
])
61 self
.assertIn('No Error', [e
['name'] for e
in rcode_stats
])
63 def test_read_one_statistic(self
):
64 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/statistics?statistic=uptime"))
65 self
.assert_success_json(r
)
67 self
.assertIn('uptime', [e
['name'] for e
in data
])
69 def test_read_one_non_existent_statistic(self
):
70 r
= self
.session
.get(self
.url("/api/v1/servers/localhost/statistics?statistic=uptimeAAAA"))
71 self
.assertEquals(r
.status_code
, 422)
72 self
.assertIn("Unknown statistic name", r
.json()['error'])
74 def test_read_metrics(self
):
76 res
= self
.session
.get(self
.url("/metrics"), auth
=('whatever', self
.webServerBasicAuthPassword
), timeout
=2.0)
77 self
.assertEqual(res
.status_code
, 200)
80 for line
in res
.text
.splitlines():
83 if line
.split(" ")[0] == "pdns_recursor_uptime":
85 self
.assertTrue(found
,"pdns_recursor_uptime is missing")
87 @unittest.skipIf(is_auth(), "Not applicable")
88 def test_read_statistics_using_password(self
):
89 r
= requests
.get(self
.url("/api/v1/servers/localhost/statistics"), auth
=('admin', self
.server_web_password
))
90 self
.assertEquals(r
.status_code
, requests
.codes
.ok
)
91 self
.assert_success_json(r
)