]>
Commit | Line | Data |
---|---|---|
1 | #!/usr/bin/env python | |
2 | import os.path | |
3 | ||
4 | import base64 | |
5 | import json | |
6 | import requests | |
7 | from dnsdisttests import DNSDistTest | |
8 | ||
9 | class TestAPIBasics(DNSDistTest): | |
10 | ||
11 | _webTimeout = 2.0 | |
12 | _webServerPort = 8083 | |
13 | _webServerBasicAuthPassword = 'secret' | |
14 | _webServerAPIKey = 'apisecret' | |
15 | # paths accessible using the API key only | |
16 | _apiOnlyPaths = ['/api/v1/servers/localhost/config', '/api/v1/servers/localhost/config/allow-from', '/api/v1/servers/localhost/statistics'] | |
17 | # paths accessible using an API key or basic auth | |
18 | _statsPaths = [ '/jsonstat?command=stats', '/jsonstat?command=dynblocklist', '/api/v1/servers/localhost'] | |
19 | # paths accessible using basic auth only (list not exhaustive) | |
20 | _basicOnlyPaths = ['/', '/index.html'] | |
21 | _config_params = ['_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey'] | |
22 | _config_template = """ | |
23 | setACL({"127.0.0.1/32", "::1/128"}) | |
24 | newServer{address="127.0.0.1:%s"} | |
25 | webserver("127.0.0.1:%s", "%s", "%s") | |
26 | """ | |
27 | ||
28 | def testBasicAuth(self): | |
29 | """ | |
30 | API: Basic Authentication | |
31 | """ | |
32 | for path in self._basicOnlyPaths + self._statsPaths: | |
33 | url = 'http://127.0.0.1:' + str(self._webServerPort) + path | |
34 | r = requests.get(url, auth=('whatever', "evilsecret"), timeout=self._webTimeout) | |
35 | self.assertEquals(r.status_code, 401) | |
36 | r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout) | |
37 | self.assertTrue(r) | |
38 | self.assertEquals(r.status_code, 200) | |
39 | ||
40 | def testXAPIKey(self): | |
41 | """ | |
42 | API: X-Api-Key | |
43 | """ | |
44 | headers = {'x-api-key': self._webServerAPIKey} | |
45 | for path in self._apiOnlyPaths + self._statsPaths: | |
46 | url = 'http://127.0.0.1:' + str(self._webServerPort) + path | |
47 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
48 | self.assertTrue(r) | |
49 | self.assertEquals(r.status_code, 200) | |
50 | ||
51 | def testWrongXAPIKey(self): | |
52 | """ | |
53 | API: Wrong X-Api-Key | |
54 | """ | |
55 | headers = {'x-api-key': "evilapikey"} | |
56 | for path in self._apiOnlyPaths + self._statsPaths: | |
57 | url = 'http://127.0.0.1:' + str(self._webServerPort) + path | |
58 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
59 | self.assertEquals(r.status_code, 401) | |
60 | ||
61 | def testBasicAuthOnly(self): | |
62 | """ | |
63 | API: Basic Authentication Only | |
64 | """ | |
65 | headers = {'x-api-key': self._webServerAPIKey} | |
66 | for path in self._basicOnlyPaths: | |
67 | url = 'http://127.0.0.1:' + str(self._webServerPort) + path | |
68 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
69 | self.assertEquals(r.status_code, 401) | |
70 | ||
71 | def testAPIKeyOnly(self): | |
72 | """ | |
73 | API: API Key Only | |
74 | """ | |
75 | for path in self._apiOnlyPaths: | |
76 | url = 'http://127.0.0.1:' + str(self._webServerPort) + path | |
77 | r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout) | |
78 | self.assertEquals(r.status_code, 401) | |
79 | ||
80 | def testServersLocalhost(self): | |
81 | """ | |
82 | API: /api/v1/servers/localhost | |
83 | """ | |
84 | headers = {'x-api-key': self._webServerAPIKey} | |
85 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost' | |
86 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
87 | self.assertTrue(r) | |
88 | self.assertEquals(r.status_code, 200) | |
89 | self.assertTrue(r.json()) | |
90 | content = r.json() | |
91 | ||
92 | self.assertEquals(content['daemon_type'], 'dnsdist') | |
93 | ||
94 | rule_groups = ['response-rules', 'cache-hit-response-rules', 'self-answered-response-rules', 'rules'] | |
95 | for key in ['version', 'acl', 'local', 'servers', 'frontends', 'pools'] + rule_groups: | |
96 | self.assertIn(key, content) | |
97 | ||
98 | for rule_group in rule_groups: | |
99 | for rule in content[rule_group]: | |
100 | for key in ['id', 'creationOrder', 'matches', 'rule', 'action', 'uuid']: | |
101 | self.assertIn(key, rule) | |
102 | for key in ['id', 'creationOrder', 'matches']: | |
103 | self.assertTrue(rule[key] >= 0) | |
104 | ||
105 | for server in content['servers']: | |
106 | for key in ['id', 'latency', 'name', 'weight', 'outstanding', 'qpsLimit', | |
107 | 'reuseds', 'state', 'address', 'pools', 'qps', 'queries', 'order', 'sendErrors', | |
108 | 'dropRate']: | |
109 | self.assertIn(key, server) | |
110 | ||
111 | for key in ['id', 'latency', 'weight', 'outstanding', 'qpsLimit', 'reuseds', | |
112 | 'qps', 'queries', 'order']: | |
113 | self.assertTrue(server[key] >= 0) | |
114 | ||
115 | self.assertTrue(server['state'] in ['up', 'down', 'UP', 'DOWN']) | |
116 | ||
117 | for frontend in content['frontends']: | |
118 | for key in ['id', 'address', 'udp', 'tcp', 'type', 'queries']: | |
119 | self.assertIn(key, frontend) | |
120 | ||
121 | for key in ['id', 'queries']: | |
122 | self.assertTrue(frontend[key] >= 0) | |
123 | ||
124 | for pool in content['pools']: | |
125 | for key in ['id', 'name', 'cacheSize', 'cacheEntries', 'cacheHits', 'cacheMisses', 'cacheDeferredInserts', 'cacheDeferredLookups', 'cacheLookupCollisions', 'cacheInsertCollisions', 'cacheTTLTooShorts']: | |
126 | self.assertIn(key, pool) | |
127 | ||
128 | for key in ['id', 'cacheSize', 'cacheEntries', 'cacheHits', 'cacheMisses', 'cacheDeferredInserts', 'cacheDeferredLookups', 'cacheLookupCollisions', 'cacheInsertCollisions', 'cacheTTLTooShorts']: | |
129 | self.assertTrue(pool[key] >= 0) | |
130 | ||
131 | def testServersIDontExist(self): | |
132 | """ | |
133 | API: /api/v1/servers/idonotexist (should be 404) | |
134 | """ | |
135 | headers = {'x-api-key': self._webServerAPIKey} | |
136 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/idonotexist' | |
137 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
138 | self.assertEquals(r.status_code, 404) | |
139 | ||
140 | def testServersLocalhostConfig(self): | |
141 | """ | |
142 | API: /api/v1/servers/localhost/config | |
143 | """ | |
144 | headers = {'x-api-key': self._webServerAPIKey} | |
145 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/config' | |
146 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
147 | self.assertTrue(r) | |
148 | self.assertEquals(r.status_code, 200) | |
149 | self.assertTrue(r.json()) | |
150 | content = r.json() | |
151 | values = {} | |
152 | for entry in content: | |
153 | for key in ['type', 'name', 'value']: | |
154 | self.assertIn(key, entry) | |
155 | ||
156 | self.assertEquals(entry['type'], 'ConfigSetting') | |
157 | values[entry['name']] = entry['value'] | |
158 | ||
159 | for key in ['acl', 'control-socket', 'ecs-override', 'ecs-source-prefix-v4', | |
160 | 'ecs-source-prefix-v6', 'fixup-case', 'max-outstanding', 'server-policy', | |
161 | 'stale-cache-entries-ttl', 'tcp-recv-timeout', 'tcp-send-timeout', | |
162 | 'truncate-tc', 'verbose', 'verbose-health-checks']: | |
163 | self.assertIn(key, values) | |
164 | ||
165 | for key in ['max-outstanding', 'stale-cache-entries-ttl', 'tcp-recv-timeout', | |
166 | 'tcp-send-timeout']: | |
167 | self.assertTrue(values[key] >= 0) | |
168 | ||
169 | self.assertTrue(values['ecs-source-prefix-v4'] >= 0 and values['ecs-source-prefix-v4'] <= 32) | |
170 | self.assertTrue(values['ecs-source-prefix-v6'] >= 0 and values['ecs-source-prefix-v6'] <= 128) | |
171 | ||
172 | def testServersLocalhostConfigAllowFrom(self): | |
173 | """ | |
174 | API: /api/v1/servers/localhost/config/allow-from | |
175 | """ | |
176 | headers = {'x-api-key': self._webServerAPIKey} | |
177 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/config/allow-from' | |
178 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
179 | self.assertTrue(r) | |
180 | self.assertEquals(r.status_code, 200) | |
181 | self.assertTrue(r.json()) | |
182 | content = r.json() | |
183 | for key in ['type', 'name', 'value']: | |
184 | self.assertIn(key, content) | |
185 | ||
186 | self.assertEquals(content['name'], 'allow-from') | |
187 | self.assertEquals(content['type'], 'ConfigSetting') | |
188 | acl = content['value'] | |
189 | expectedACL = ["127.0.0.1/32", "::1/128"] | |
190 | acl.sort() | |
191 | expectedACL.sort() | |
192 | self.assertEquals(acl, expectedACL) | |
193 | ||
194 | def testServersLocalhostConfigAllowFromPut(self): | |
195 | """ | |
196 | API: PUT /api/v1/servers/localhost/config/allow-from (should be refused) | |
197 | ||
198 | The API is read-only by default, so this should be refused | |
199 | """ | |
200 | newACL = ["192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"] | |
201 | payload = json.dumps({"name": "allow-from", | |
202 | "type": "ConfigSetting", | |
203 | "value": newACL}) | |
204 | headers = {'x-api-key': self._webServerAPIKey} | |
205 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/config/allow-from' | |
206 | r = requests.put(url, headers=headers, timeout=self._webTimeout, data=payload) | |
207 | self.assertFalse(r) | |
208 | self.assertEquals(r.status_code, 405) | |
209 | ||
210 | def testServersLocalhostStatistics(self): | |
211 | """ | |
212 | API: /api/v1/servers/localhost/statistics | |
213 | """ | |
214 | headers = {'x-api-key': self._webServerAPIKey} | |
215 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/statistics' | |
216 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
217 | self.assertTrue(r) | |
218 | self.assertEquals(r.status_code, 200) | |
219 | self.assertTrue(r.json()) | |
220 | content = r.json() | |
221 | values = {} | |
222 | for entry in content: | |
223 | self.assertIn('type', entry) | |
224 | self.assertIn('name', entry) | |
225 | self.assertIn('value', entry) | |
226 | self.assertEquals(entry['type'], 'StatisticItem') | |
227 | values[entry['name']] = entry['value'] | |
228 | ||
229 | expected = ['responses', 'servfail-responses', 'queries', 'acl-drops', | |
230 | 'frontend-noerror', 'frontend-nxdomain', 'frontend-servfail', | |
231 | 'rule-drop', 'rule-nxdomain', 'rule-refused', 'self-answered', 'downstream-timeouts', | |
232 | 'downstream-send-errors', 'trunc-failures', 'no-policy', 'latency0-1', | |
233 | 'latency1-10', 'latency10-50', 'latency50-100', 'latency100-1000', | |
234 | 'latency-slow', 'latency-sum', 'latency-count', 'latency-avg100', 'latency-avg1000', | |
235 | 'latency-avg10000', 'latency-avg1000000', 'uptime', 'real-memory-usage', 'noncompliant-queries', | |
236 | 'noncompliant-responses', 'rdqueries', 'empty-queries', 'cache-hits', | |
237 | 'cache-misses', 'cpu-iowait', 'cpu-steal', 'cpu-sys-msec', 'cpu-user-msec', 'fd-usage', 'dyn-blocked', | |
238 | 'dyn-block-nmg-size', 'rule-servfail', 'security-status', | |
239 | 'udp-in-errors', 'udp-noport-errors', 'udp-recvbuf-errors', 'udp-sndbuf-errors', | |
240 | 'doh-query-pipe-full', 'doh-response-pipe-full'] | |
241 | ||
242 | for key in expected: | |
243 | self.assertIn(key, values) | |
244 | self.assertTrue(values[key] >= 0) | |
245 | ||
246 | for key in values: | |
247 | self.assertIn(key, expected) | |
248 | ||
249 | def testJsonstatStats(self): | |
250 | """ | |
251 | API: /jsonstat?command=stats | |
252 | """ | |
253 | headers = {'x-api-key': self._webServerAPIKey} | |
254 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/jsonstat?command=stats' | |
255 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
256 | self.assertTrue(r) | |
257 | self.assertEquals(r.status_code, 200) | |
258 | self.assertTrue(r.json()) | |
259 | content = r.json() | |
260 | ||
261 | expected = ['responses', 'servfail-responses', 'queries', 'acl-drops', | |
262 | 'frontend-noerror', 'frontend-nxdomain', 'frontend-servfail', | |
263 | 'rule-drop', 'rule-nxdomain', 'rule-refused', 'self-answered', 'downstream-timeouts', | |
264 | 'downstream-send-errors', 'trunc-failures', 'no-policy', 'latency0-1', | |
265 | 'latency1-10', 'latency10-50', 'latency50-100', 'latency100-1000', | |
266 | 'latency-slow', 'latency-avg100', 'latency-avg1000', 'latency-avg10000', | |
267 | 'latency-avg1000000', 'uptime', 'real-memory-usage', 'noncompliant-queries', | |
268 | 'noncompliant-responses', 'rdqueries', 'empty-queries', 'cache-hits', | |
269 | 'cache-misses', 'cpu-user-msec', 'cpu-sys-msec', 'fd-usage', 'dyn-blocked', | |
270 | 'dyn-block-nmg-size', 'packetcache-hits', 'packetcache-misses', 'over-capacity-drops', | |
271 | 'too-old-drops'] | |
272 | ||
273 | for key in expected: | |
274 | self.assertIn(key, content) | |
275 | self.assertTrue(content[key] >= 0) | |
276 | ||
277 | def testJsonstatDynblocklist(self): | |
278 | """ | |
279 | API: /jsonstat?command=dynblocklist | |
280 | """ | |
281 | headers = {'x-api-key': self._webServerAPIKey} | |
282 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/jsonstat?command=dynblocklist' | |
283 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
284 | self.assertTrue(r) | |
285 | self.assertEquals(r.status_code, 200) | |
286 | ||
287 | content = r.json() | |
288 | ||
289 | if content: | |
290 | for key in ['reason', 'seconds', 'blocks', 'action']: | |
291 | self.assertIn(key, content) | |
292 | ||
293 | for key in ['blocks']: | |
294 | self.assertTrue(content[key] >= 0) | |
295 | ||
296 | class TestAPIServerDown(DNSDistTest): | |
297 | ||
298 | _webTimeout = 2.0 | |
299 | _webServerPort = 8083 | |
300 | _webServerBasicAuthPassword = 'secret' | |
301 | _webServerAPIKey = 'apisecret' | |
302 | # paths accessible using the API key | |
303 | _config_params = ['_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey'] | |
304 | _config_template = """ | |
305 | setACL({"127.0.0.1/32", "::1/128"}) | |
306 | newServer{address="127.0.0.1:%s"} | |
307 | getServer(0):setDown() | |
308 | webserver("127.0.0.1:%s", "%s", "%s") | |
309 | """ | |
310 | ||
311 | def testServerDownNoLatencyLocalhost(self): | |
312 | """ | |
313 | API: /api/v1/servers/localhost, no latency for a down server | |
314 | """ | |
315 | headers = {'x-api-key': self._webServerAPIKey} | |
316 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost' | |
317 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
318 | self.assertTrue(r) | |
319 | self.assertEquals(r.status_code, 200) | |
320 | self.assertTrue(r.json()) | |
321 | content = r.json() | |
322 | ||
323 | self.assertEquals(content['servers'][0]['latency'], None) | |
324 | ||
325 | class TestAPIWritable(DNSDistTest): | |
326 | ||
327 | _webTimeout = 2.0 | |
328 | _webServerPort = 8083 | |
329 | _webServerBasicAuthPassword = 'secret' | |
330 | _webServerAPIKey = 'apisecret' | |
331 | _APIWriteDir = '/tmp' | |
332 | _config_params = ['_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey', '_APIWriteDir'] | |
333 | _config_template = """ | |
334 | setACL({"127.0.0.1/32", "::1/128"}) | |
335 | newServer{address="127.0.0.1:%s"} | |
336 | webserver("127.0.0.1:%s", "%s", "%s") | |
337 | setAPIWritable(true, "%s") | |
338 | """ | |
339 | ||
340 | def testSetACL(self): | |
341 | """ | |
342 | API: Set ACL | |
343 | """ | |
344 | headers = {'x-api-key': self._webServerAPIKey} | |
345 | url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/config/allow-from' | |
346 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
347 | self.assertTrue(r) | |
348 | self.assertEquals(r.status_code, 200) | |
349 | self.assertTrue(r.json()) | |
350 | content = r.json() | |
351 | acl = content['value'] | |
352 | expectedACL = ["127.0.0.1/32", "::1/128"] | |
353 | acl.sort() | |
354 | expectedACL.sort() | |
355 | self.assertEquals(acl, expectedACL) | |
356 | ||
357 | newACL = ["192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"] | |
358 | payload = json.dumps({"name": "allow-from", | |
359 | "type": "ConfigSetting", | |
360 | "value": newACL}) | |
361 | r = requests.put(url, headers=headers, timeout=self._webTimeout, data=payload) | |
362 | self.assertTrue(r) | |
363 | self.assertEquals(r.status_code, 200) | |
364 | self.assertTrue(r.json()) | |
365 | content = r.json() | |
366 | acl = content['value'] | |
367 | acl.sort() | |
368 | self.assertEquals(acl, newACL) | |
369 | ||
370 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
371 | self.assertTrue(r) | |
372 | self.assertEquals(r.status_code, 200) | |
373 | self.assertTrue(r.json()) | |
374 | content = r.json() | |
375 | acl = content['value'] | |
376 | acl.sort() | |
377 | self.assertEquals(acl, newACL) | |
378 | ||
379 | configFile = self._APIWriteDir + '/' + 'acl.conf' | |
380 | self.assertTrue(os.path.isfile(configFile)) | |
381 | fileContent = None | |
382 | with open(configFile, 'rt') as f: | |
383 | header = f.readline() | |
384 | body = f.readline() | |
385 | ||
386 | self.assertEquals(header, """-- Generated by the REST API, DO NOT EDIT\n""") | |
387 | ||
388 | self.assertIn(body, { | |
389 | """setACL({"192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"})\n""", | |
390 | """setACL({"192.0.2.0/24", "203.0.113.0/24", "198.51.100.0/24"})\n""", | |
391 | """setACL({"198.51.100.0/24", "192.0.2.0/24", "203.0.113.0/24"})\n""", | |
392 | """setACL({"198.51.100.0/24", "203.0.113.0/24", "192.0.2.0/24"})\n""", | |
393 | """setACL({"203.0.113.0/24", "192.0.2.0/24", "198.51.100.0/24"})\n""", | |
394 | """setACL({"203.0.113.0/24", "198.51.100.0/24", "192.0.2.0/24"})\n""" | |
395 | }) | |
396 | ||
397 | class TestAPICustomHeaders(DNSDistTest): | |
398 | ||
399 | _webTimeout = 2.0 | |
400 | _webServerPort = 8083 | |
401 | _webServerBasicAuthPassword = 'secret' | |
402 | _webServerAPIKey = 'apisecret' | |
403 | # paths accessible using the API key only | |
404 | _apiOnlyPath = '/api/v1/servers/localhost/config' | |
405 | # paths accessible using basic auth only (list not exhaustive) | |
406 | _basicOnlyPath = '/' | |
407 | _consoleKey = DNSDistTest.generateConsoleKey() | |
408 | _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') | |
409 | _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey'] | |
410 | _config_template = """ | |
411 | setKey("%s") | |
412 | controlSocket("127.0.0.1:%s") | |
413 | setACL({"127.0.0.1/32", "::1/128"}) | |
414 | newServer({address="127.0.0.1:%s"}) | |
415 | webserver("127.0.0.1:%s", "%s", "%s", {["X-Frame-Options"]="", ["X-Custom"]="custom"}) | |
416 | """ | |
417 | ||
418 | def testBasicHeaders(self): | |
419 | """ | |
420 | API: Basic custom headers | |
421 | """ | |
422 | ||
423 | url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath | |
424 | ||
425 | r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout) | |
426 | self.assertTrue(r) | |
427 | self.assertEquals(r.status_code, 200) | |
428 | self.assertEquals(r.headers.get('x-custom'), "custom") | |
429 | self.assertFalse("x-frame-options" in r.headers) | |
430 | ||
431 | def testBasicHeadersUpdate(self): | |
432 | """ | |
433 | API: Basic update of custom headers | |
434 | """ | |
435 | ||
436 | url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath | |
437 | self.sendConsoleCommand('setWebserverConfig({customHeaders={["x-powered-by"]="dnsdist"}})') | |
438 | r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout) | |
439 | self.assertTrue(r) | |
440 | self.assertEquals(r.status_code, 200) | |
441 | self.assertEquals(r.headers.get('x-powered-by'), "dnsdist") | |
442 | self.assertTrue("x-frame-options" in r.headers) | |
443 | ||
444 | ||
445 | class TestAPIAuth(DNSDistTest): | |
446 | ||
447 | _webTimeout = 2.0 | |
448 | _webServerPort = 8083 | |
449 | _webServerBasicAuthPassword = 'secret' | |
450 | _webServerBasicAuthPasswordNew = 'password' | |
451 | _webServerAPIKey = 'apisecret' | |
452 | _webServerAPIKeyNew = 'apipassword' | |
453 | # paths accessible using the API key only | |
454 | _apiOnlyPath = '/api/v1/servers/localhost/config' | |
455 | # paths accessible using basic auth only (list not exhaustive) | |
456 | _basicOnlyPath = '/' | |
457 | _consoleKey = DNSDistTest.generateConsoleKey() | |
458 | _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') | |
459 | _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey'] | |
460 | _config_template = """ | |
461 | setKey("%s") | |
462 | controlSocket("127.0.0.1:%s") | |
463 | setACL({"127.0.0.1/32", "::1/128"}) | |
464 | newServer{address="127.0.0.1:%s"} | |
465 | webserver("127.0.0.1:%s", "%s", "%s") | |
466 | """ | |
467 | ||
468 | def testBasicAuthChange(self): | |
469 | """ | |
470 | API: Basic Authentication updating credentials | |
471 | """ | |
472 | ||
473 | url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath | |
474 | self.sendConsoleCommand('setWebserverConfig({{password="{}"}})'.format(self._webServerBasicAuthPasswordNew)) | |
475 | ||
476 | r = requests.get(url, auth=('whatever', self._webServerBasicAuthPasswordNew), timeout=self._webTimeout) | |
477 | self.assertTrue(r) | |
478 | self.assertEquals(r.status_code, 200) | |
479 | ||
480 | # Make sure the old password is not usable any more | |
481 | r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout) | |
482 | self.assertEquals(r.status_code, 401) | |
483 | ||
484 | def testXAPIKeyChange(self): | |
485 | """ | |
486 | API: X-Api-Key updating credentials | |
487 | """ | |
488 | ||
489 | url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath | |
490 | self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew)) | |
491 | ||
492 | headers = {'x-api-key': self._webServerAPIKeyNew} | |
493 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
494 | self.assertTrue(r) | |
495 | self.assertEquals(r.status_code, 200) | |
496 | ||
497 | # Make sure the old password is not usable any more | |
498 | headers = {'x-api-key': self._webServerAPIKey} | |
499 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
500 | self.assertEquals(r.status_code, 401) | |
501 | ||
502 | def testBasicAuthOnlyChange(self): | |
503 | """ | |
504 | API: X-Api-Key updated to none (disabled) | |
505 | """ | |
506 | ||
507 | url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath | |
508 | self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew)) | |
509 | ||
510 | headers = {'x-api-key': self._webServerAPIKeyNew} | |
511 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
512 | self.assertTrue(r) | |
513 | self.assertEquals(r.status_code, 200) | |
514 | ||
515 | # now disable apiKey | |
516 | self.sendConsoleCommand('setWebserverConfig({apiKey=""})') | |
517 | ||
518 | r = requests.get(url, headers=headers, timeout=self._webTimeout) | |
519 | self.assertEquals(r.status_code, 401) |