]>
Commit | Line | Data |
---|---|---|
1 | import requests | |
2 | import unittest | |
3 | from test_helper import ApiTestCase, is_auth, is_recursor | |
4 | ||
5 | ||
6 | class Servers(ApiTestCase): | |
7 | ||
8 | def test_list_servers(self): | |
9 | r = self.session.get(self.url("/api/v1/servers")) | |
10 | self.assert_success_json(r) | |
11 | lst = r.json() | |
12 | self.assertEquals(len(lst), 1) # only localhost allowed in there | |
13 | data = lst[0] | |
14 | for k in ('id', 'daemon_type', 'url'): | |
15 | self.assertIn(k, data) | |
16 | self.assertEquals(data['id'], 'localhost') | |
17 | ||
18 | def test_servers_localhost(self): | |
19 | r = self.session.get(self.url("/api/v1/servers/localhost")) | |
20 | self.assert_success_json(r) | |
21 | data = r.json() | |
22 | for k in ('id', 'type', 'version', 'daemon_type', 'url', 'zones_url', 'config_url'): | |
23 | self.assertIn(k, data) | |
24 | self.assertEquals(data['id'], 'localhost') | |
25 | self.assertEquals(data['type'], 'Server') | |
26 | # or 'recursor' for recursors | |
27 | if is_auth(): | |
28 | daemon_type = 'authoritative' | |
29 | elif is_recursor(): | |
30 | daemon_type = 'recursor' | |
31 | else: | |
32 | raise RuntimeError('Unknown daemon type') | |
33 | self.assertEquals(data['daemon_type'], daemon_type) | |
34 | ||
35 | def test_read_config(self): | |
36 | r = self.session.get(self.url("/api/v1/servers/localhost/config")) | |
37 | self.assert_success_json(r) | |
38 | data = dict([(r['name'], r['value']) for r in r.json()]) | |
39 | self.assertIn('daemon', data) | |
40 | ||
41 | def test_read_statistics(self): | |
42 | r = self.session.get(self.url("/api/v1/servers/localhost/statistics")) | |
43 | self.assert_success_json(r) | |
44 | data = r.json() | |
45 | self.assertIn('uptime', [e['name'] for e in data]) | |
46 | if is_auth(): | |
47 | print(data) | |
48 | qtype_stats, respsize_stats, queries_stats, rcode_stats = None, None, None, None | |
49 | for elem in data: | |
50 | if elem['type'] == 'MapStatisticItem' and elem['name'] == 'response-by-qtype': | |
51 | qtype_stats = elem['value'] | |
52 | elif elem['type'] == 'MapStatisticItem' and elem['name'] == 'response-sizes': | |
53 | respsize_stats = elem['value'] | |
54 | elif elem['type'] == 'RingStatisticItem' and elem['name'] == 'queries': | |
55 | queries_stats = elem['value'] | |
56 | elif elem['type'] == 'MapStatisticItem' and elem['name'] == 'response-by-rcode': | |
57 | rcode_stats = elem['value'] | |
58 | self.assertIn('A', [e['name'] for e in qtype_stats]) | |
59 | self.assertIn('60', [e['name'] for e in respsize_stats]) | |
60 | self.assertIn('example.com/A', [e['name'] for e in queries_stats]) | |
61 | self.assertIn('No Error', [e['name'] for e in rcode_stats]) | |
62 | ||
63 | def test_read_one_statistic(self): | |
64 | r = self.session.get(self.url("/api/v1/servers/localhost/statistics?statistic=uptime")) | |
65 | self.assert_success_json(r) | |
66 | data = r.json() | |
67 | self.assertIn('uptime', [e['name'] for e in data]) | |
68 | ||
69 | def test_read_one_non_existent_statistic(self): | |
70 | r = self.session.get(self.url("/api/v1/servers/localhost/statistics?statistic=uptimeAAAA")) | |
71 | self.assertEquals(r.status_code, 422) | |
72 | self.assertIn("Unknown statistic name", r.json()['error']) | |
73 | ||
74 | @unittest.skipIf(is_auth(), "Not applicable") | |
75 | def test_read_statistics_using_password(self): | |
76 | r = requests.get(self.url("/api/v1/servers/localhost/statistics"), auth=('admin', self.server_web_password)) | |
77 | self.assertEquals(r.status_code, requests.codes.ok) | |
78 | self.assert_success_json(r) |