]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_API.py
Merge pull request #9229 from rgacogne/dnsdist-webserver-allow-from
[thirdparty/pdns.git] / regression-tests.dnsdist / test_API.py
1 #!/usr/bin/env python
2 import os.path
3
4 import base64
5 import json
6 import requests
7 from dnsdisttests import DNSDistTest
8
9 class TestAPIBasics(DNSDistTest):
10
11 _webTimeout = 2.0
12 _webServerPort = 8083
13 _webServerBasicAuthPassword = 'secret'
14 _webServerAPIKey = 'apisecret'
15 # paths accessible using the API key only
16 _apiOnlyPaths = ['/api/v1/servers/localhost/config', '/api/v1/servers/localhost/config/allow-from', '/api/v1/servers/localhost/statistics']
17 # paths accessible using an API key or basic auth
18 _statsPaths = [ '/jsonstat?command=stats', '/jsonstat?command=dynblocklist', '/api/v1/servers/localhost']
19 # paths accessible using basic auth only (list not exhaustive)
20 _basicOnlyPaths = ['/', '/index.html']
21 _config_params = ['_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
22 _config_template = """
23 setACL({"127.0.0.1/32", "::1/128"})
24 newServer{address="127.0.0.1:%s"}
25 webserver("127.0.0.1:%s", "%s", "%s")
26 """
27
28 def testBasicAuth(self):
29 """
30 API: Basic Authentication
31 """
32 for path in self._basicOnlyPaths + self._statsPaths:
33 url = 'http://127.0.0.1:' + str(self._webServerPort) + path
34 r = requests.get(url, auth=('whatever', "evilsecret"), timeout=self._webTimeout)
35 self.assertEquals(r.status_code, 401)
36 r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
37 self.assertTrue(r)
38 self.assertEquals(r.status_code, 200)
39
40 def testXAPIKey(self):
41 """
42 API: X-Api-Key
43 """
44 headers = {'x-api-key': self._webServerAPIKey}
45 for path in self._apiOnlyPaths + self._statsPaths:
46 url = 'http://127.0.0.1:' + str(self._webServerPort) + path
47 r = requests.get(url, headers=headers, timeout=self._webTimeout)
48 self.assertTrue(r)
49 self.assertEquals(r.status_code, 200)
50
51 def testWrongXAPIKey(self):
52 """
53 API: Wrong X-Api-Key
54 """
55 headers = {'x-api-key': "evilapikey"}
56 for path in self._apiOnlyPaths + self._statsPaths:
57 url = 'http://127.0.0.1:' + str(self._webServerPort) + path
58 r = requests.get(url, headers=headers, timeout=self._webTimeout)
59 self.assertEquals(r.status_code, 401)
60
61 def testBasicAuthOnly(self):
62 """
63 API: Basic Authentication Only
64 """
65 headers = {'x-api-key': self._webServerAPIKey}
66 for path in self._basicOnlyPaths:
67 url = 'http://127.0.0.1:' + str(self._webServerPort) + path
68 r = requests.get(url, headers=headers, timeout=self._webTimeout)
69 self.assertEquals(r.status_code, 401)
70
71 def testAPIKeyOnly(self):
72 """
73 API: API Key Only
74 """
75 for path in self._apiOnlyPaths:
76 url = 'http://127.0.0.1:' + str(self._webServerPort) + path
77 r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
78 self.assertEquals(r.status_code, 401)
79
80 def testServersLocalhost(self):
81 """
82 API: /api/v1/servers/localhost
83 """
84 headers = {'x-api-key': self._webServerAPIKey}
85 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost'
86 r = requests.get(url, headers=headers, timeout=self._webTimeout)
87 self.assertTrue(r)
88 self.assertEquals(r.status_code, 200)
89 self.assertTrue(r.json())
90 content = r.json()
91
92 self.assertEquals(content['daemon_type'], 'dnsdist')
93
94 rule_groups = ['response-rules', 'cache-hit-response-rules', 'self-answered-response-rules', 'rules']
95 for key in ['version', 'acl', 'local', 'servers', 'frontends', 'pools'] + rule_groups:
96 self.assertIn(key, content)
97
98 for rule_group in rule_groups:
99 for rule in content[rule_group]:
100 for key in ['id', 'creationOrder', 'matches', 'rule', 'action', 'uuid']:
101 self.assertIn(key, rule)
102 for key in ['id', 'creationOrder', 'matches']:
103 self.assertTrue(rule[key] >= 0)
104
105 for server in content['servers']:
106 for key in ['id', 'latency', 'name', 'weight', 'outstanding', 'qpsLimit',
107 'reuseds', 'state', 'address', 'pools', 'qps', 'queries', 'order', 'sendErrors',
108 'dropRate']:
109 self.assertIn(key, server)
110
111 for key in ['id', 'latency', 'weight', 'outstanding', 'qpsLimit', 'reuseds',
112 'qps', 'queries', 'order']:
113 self.assertTrue(server[key] >= 0)
114
115 self.assertTrue(server['state'] in ['up', 'down', 'UP', 'DOWN'])
116
117 for frontend in content['frontends']:
118 for key in ['id', 'address', 'udp', 'tcp', 'type', 'queries']:
119 self.assertIn(key, frontend)
120
121 for key in ['id', 'queries']:
122 self.assertTrue(frontend[key] >= 0)
123
124 for pool in content['pools']:
125 for key in ['id', 'name', 'cacheSize', 'cacheEntries', 'cacheHits', 'cacheMisses', 'cacheDeferredInserts', 'cacheDeferredLookups', 'cacheLookupCollisions', 'cacheInsertCollisions', 'cacheTTLTooShorts']:
126 self.assertIn(key, pool)
127
128 for key in ['id', 'cacheSize', 'cacheEntries', 'cacheHits', 'cacheMisses', 'cacheDeferredInserts', 'cacheDeferredLookups', 'cacheLookupCollisions', 'cacheInsertCollisions', 'cacheTTLTooShorts']:
129 self.assertTrue(pool[key] >= 0)
130
131 def testServersIDontExist(self):
132 """
133 API: /api/v1/servers/idonotexist (should be 404)
134 """
135 headers = {'x-api-key': self._webServerAPIKey}
136 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/idonotexist'
137 r = requests.get(url, headers=headers, timeout=self._webTimeout)
138 self.assertEquals(r.status_code, 404)
139
140 def testServersLocalhostConfig(self):
141 """
142 API: /api/v1/servers/localhost/config
143 """
144 headers = {'x-api-key': self._webServerAPIKey}
145 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/config'
146 r = requests.get(url, headers=headers, timeout=self._webTimeout)
147 self.assertTrue(r)
148 self.assertEquals(r.status_code, 200)
149 self.assertTrue(r.json())
150 content = r.json()
151 values = {}
152 for entry in content:
153 for key in ['type', 'name', 'value']:
154 self.assertIn(key, entry)
155
156 self.assertEquals(entry['type'], 'ConfigSetting')
157 values[entry['name']] = entry['value']
158
159 for key in ['acl', 'control-socket', 'ecs-override', 'ecs-source-prefix-v4',
160 'ecs-source-prefix-v6', 'fixup-case', 'max-outstanding', 'server-policy',
161 'stale-cache-entries-ttl', 'tcp-recv-timeout', 'tcp-send-timeout',
162 'truncate-tc', 'verbose', 'verbose-health-checks']:
163 self.assertIn(key, values)
164
165 for key in ['max-outstanding', 'stale-cache-entries-ttl', 'tcp-recv-timeout',
166 'tcp-send-timeout']:
167 self.assertTrue(values[key] >= 0)
168
169 self.assertTrue(values['ecs-source-prefix-v4'] >= 0 and values['ecs-source-prefix-v4'] <= 32)
170 self.assertTrue(values['ecs-source-prefix-v6'] >= 0 and values['ecs-source-prefix-v6'] <= 128)
171
172 def testServersLocalhostConfigAllowFrom(self):
173 """
174 API: /api/v1/servers/localhost/config/allow-from
175 """
176 headers = {'x-api-key': self._webServerAPIKey}
177 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/config/allow-from'
178 r = requests.get(url, headers=headers, timeout=self._webTimeout)
179 self.assertTrue(r)
180 self.assertEquals(r.status_code, 200)
181 self.assertTrue(r.json())
182 content = r.json()
183 for key in ['type', 'name', 'value']:
184 self.assertIn(key, content)
185
186 self.assertEquals(content['name'], 'allow-from')
187 self.assertEquals(content['type'], 'ConfigSetting')
188 acl = content['value']
189 expectedACL = ["127.0.0.1/32", "::1/128"]
190 acl.sort()
191 expectedACL.sort()
192 self.assertEquals(acl, expectedACL)
193
194 def testServersLocalhostConfigAllowFromPut(self):
195 """
196 API: PUT /api/v1/servers/localhost/config/allow-from (should be refused)
197
198 The API is read-only by default, so this should be refused
199 """
200 newACL = ["192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"]
201 payload = json.dumps({"name": "allow-from",
202 "type": "ConfigSetting",
203 "value": newACL})
204 headers = {'x-api-key': self._webServerAPIKey}
205 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/config/allow-from'
206 r = requests.put(url, headers=headers, timeout=self._webTimeout, data=payload)
207 self.assertFalse(r)
208 self.assertEquals(r.status_code, 405)
209
210 def testServersLocalhostStatistics(self):
211 """
212 API: /api/v1/servers/localhost/statistics
213 """
214 headers = {'x-api-key': self._webServerAPIKey}
215 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/statistics'
216 r = requests.get(url, headers=headers, timeout=self._webTimeout)
217 self.assertTrue(r)
218 self.assertEquals(r.status_code, 200)
219 self.assertTrue(r.json())
220 content = r.json()
221 values = {}
222 for entry in content:
223 self.assertIn('type', entry)
224 self.assertIn('name', entry)
225 self.assertIn('value', entry)
226 self.assertEquals(entry['type'], 'StatisticItem')
227 values[entry['name']] = entry['value']
228
229 expected = ['responses', 'servfail-responses', 'queries', 'acl-drops',
230 'frontend-noerror', 'frontend-nxdomain', 'frontend-servfail',
231 'rule-drop', 'rule-nxdomain', 'rule-refused', 'self-answered', 'downstream-timeouts',
232 'downstream-send-errors', 'trunc-failures', 'no-policy', 'latency0-1',
233 'latency1-10', 'latency10-50', 'latency50-100', 'latency100-1000',
234 'latency-slow', 'latency-sum', 'latency-count', 'latency-avg100', 'latency-avg1000',
235 'latency-avg10000', 'latency-avg1000000', 'uptime', 'real-memory-usage', 'noncompliant-queries',
236 'noncompliant-responses', 'rdqueries', 'empty-queries', 'cache-hits',
237 'cache-misses', 'cpu-iowait', 'cpu-steal', 'cpu-sys-msec', 'cpu-user-msec', 'fd-usage', 'dyn-blocked',
238 'dyn-block-nmg-size', 'rule-servfail', 'security-status',
239 'udp-in-errors', 'udp-noport-errors', 'udp-recvbuf-errors', 'udp-sndbuf-errors',
240 'doh-query-pipe-full', 'doh-response-pipe-full']
241
242 for key in expected:
243 self.assertIn(key, values)
244 self.assertTrue(values[key] >= 0)
245
246 for key in values:
247 self.assertIn(key, expected)
248
249 def testJsonstatStats(self):
250 """
251 API: /jsonstat?command=stats
252 """
253 headers = {'x-api-key': self._webServerAPIKey}
254 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/jsonstat?command=stats'
255 r = requests.get(url, headers=headers, timeout=self._webTimeout)
256 self.assertTrue(r)
257 self.assertEquals(r.status_code, 200)
258 self.assertTrue(r.json())
259 content = r.json()
260
261 expected = ['responses', 'servfail-responses', 'queries', 'acl-drops',
262 'frontend-noerror', 'frontend-nxdomain', 'frontend-servfail',
263 'rule-drop', 'rule-nxdomain', 'rule-refused', 'self-answered', 'downstream-timeouts',
264 'downstream-send-errors', 'trunc-failures', 'no-policy', 'latency0-1',
265 'latency1-10', 'latency10-50', 'latency50-100', 'latency100-1000',
266 'latency-slow', 'latency-avg100', 'latency-avg1000', 'latency-avg10000',
267 'latency-avg1000000', 'uptime', 'real-memory-usage', 'noncompliant-queries',
268 'noncompliant-responses', 'rdqueries', 'empty-queries', 'cache-hits',
269 'cache-misses', 'cpu-user-msec', 'cpu-sys-msec', 'fd-usage', 'dyn-blocked',
270 'dyn-block-nmg-size', 'packetcache-hits', 'packetcache-misses', 'over-capacity-drops',
271 'too-old-drops']
272
273 for key in expected:
274 self.assertIn(key, content)
275 self.assertTrue(content[key] >= 0)
276
277 def testJsonstatDynblocklist(self):
278 """
279 API: /jsonstat?command=dynblocklist
280 """
281 headers = {'x-api-key': self._webServerAPIKey}
282 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/jsonstat?command=dynblocklist'
283 r = requests.get(url, headers=headers, timeout=self._webTimeout)
284 self.assertTrue(r)
285 self.assertEquals(r.status_code, 200)
286
287 content = r.json()
288
289 if content:
290 for key in ['reason', 'seconds', 'blocks', 'action']:
291 self.assertIn(key, content)
292
293 for key in ['blocks']:
294 self.assertTrue(content[key] >= 0)
295
296 class TestAPIServerDown(DNSDistTest):
297
298 _webTimeout = 2.0
299 _webServerPort = 8083
300 _webServerBasicAuthPassword = 'secret'
301 _webServerAPIKey = 'apisecret'
302 # paths accessible using the API key
303 _config_params = ['_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
304 _config_template = """
305 setACL({"127.0.0.1/32", "::1/128"})
306 newServer{address="127.0.0.1:%s"}
307 getServer(0):setDown()
308 webserver("127.0.0.1:%s", "%s", "%s")
309 """
310
311 def testServerDownNoLatencyLocalhost(self):
312 """
313 API: /api/v1/servers/localhost, no latency for a down server
314 """
315 headers = {'x-api-key': self._webServerAPIKey}
316 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost'
317 r = requests.get(url, headers=headers, timeout=self._webTimeout)
318 self.assertTrue(r)
319 self.assertEquals(r.status_code, 200)
320 self.assertTrue(r.json())
321 content = r.json()
322
323 self.assertEquals(content['servers'][0]['latency'], None)
324
325 class TestAPIWritable(DNSDistTest):
326
327 _webTimeout = 2.0
328 _webServerPort = 8083
329 _webServerBasicAuthPassword = 'secret'
330 _webServerAPIKey = 'apisecret'
331 _APIWriteDir = '/tmp'
332 _config_params = ['_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey', '_APIWriteDir']
333 _config_template = """
334 setACL({"127.0.0.1/32", "::1/128"})
335 newServer{address="127.0.0.1:%s"}
336 webserver("127.0.0.1:%s", "%s", "%s")
337 setAPIWritable(true, "%s")
338 """
339
340 def testSetACL(self):
341 """
342 API: Set ACL
343 """
344 headers = {'x-api-key': self._webServerAPIKey}
345 url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/localhost/config/allow-from'
346 r = requests.get(url, headers=headers, timeout=self._webTimeout)
347 self.assertTrue(r)
348 self.assertEquals(r.status_code, 200)
349 self.assertTrue(r.json())
350 content = r.json()
351 acl = content['value']
352 expectedACL = ["127.0.0.1/32", "::1/128"]
353 acl.sort()
354 expectedACL.sort()
355 self.assertEquals(acl, expectedACL)
356
357 newACL = ["192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"]
358 payload = json.dumps({"name": "allow-from",
359 "type": "ConfigSetting",
360 "value": newACL})
361 r = requests.put(url, headers=headers, timeout=self._webTimeout, data=payload)
362 self.assertTrue(r)
363 self.assertEquals(r.status_code, 200)
364 self.assertTrue(r.json())
365 content = r.json()
366 acl = content['value']
367 acl.sort()
368 self.assertEquals(acl, newACL)
369
370 r = requests.get(url, headers=headers, timeout=self._webTimeout)
371 self.assertTrue(r)
372 self.assertEquals(r.status_code, 200)
373 self.assertTrue(r.json())
374 content = r.json()
375 acl = content['value']
376 acl.sort()
377 self.assertEquals(acl, newACL)
378
379 configFile = self._APIWriteDir + '/' + 'acl.conf'
380 self.assertTrue(os.path.isfile(configFile))
381 fileContent = None
382 with open(configFile, 'rt') as f:
383 header = f.readline()
384 body = f.readline()
385
386 self.assertEquals(header, """-- Generated by the REST API, DO NOT EDIT\n""")
387
388 self.assertIn(body, {
389 """setACL({"192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"})\n""",
390 """setACL({"192.0.2.0/24", "203.0.113.0/24", "198.51.100.0/24"})\n""",
391 """setACL({"198.51.100.0/24", "192.0.2.0/24", "203.0.113.0/24"})\n""",
392 """setACL({"198.51.100.0/24", "203.0.113.0/24", "192.0.2.0/24"})\n""",
393 """setACL({"203.0.113.0/24", "192.0.2.0/24", "198.51.100.0/24"})\n""",
394 """setACL({"203.0.113.0/24", "198.51.100.0/24", "192.0.2.0/24"})\n"""
395 })
396
397 class TestAPICustomHeaders(DNSDistTest):
398
399 _webTimeout = 2.0
400 _webServerPort = 8083
401 _webServerBasicAuthPassword = 'secret'
402 _webServerAPIKey = 'apisecret'
403 # paths accessible using the API key only
404 _apiOnlyPath = '/api/v1/servers/localhost/config'
405 # paths accessible using basic auth only (list not exhaustive)
406 _basicOnlyPath = '/'
407 _consoleKey = DNSDistTest.generateConsoleKey()
408 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
409 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
410 _config_template = """
411 setKey("%s")
412 controlSocket("127.0.0.1:%s")
413 setACL({"127.0.0.1/32", "::1/128"})
414 newServer({address="127.0.0.1:%s"})
415 webserver("127.0.0.1:%s", "%s", "%s", {["X-Frame-Options"]="", ["X-Custom"]="custom"})
416 """
417
418 def testBasicHeaders(self):
419 """
420 API: Basic custom headers
421 """
422
423 url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
424
425 r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
426 self.assertTrue(r)
427 self.assertEquals(r.status_code, 200)
428 self.assertEquals(r.headers.get('x-custom'), "custom")
429 self.assertFalse("x-frame-options" in r.headers)
430
431 def testBasicHeadersUpdate(self):
432 """
433 API: Basic update of custom headers
434 """
435
436 url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
437 self.sendConsoleCommand('setWebserverConfig({customHeaders={["x-powered-by"]="dnsdist"}})')
438 r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
439 self.assertTrue(r)
440 self.assertEquals(r.status_code, 200)
441 self.assertEquals(r.headers.get('x-powered-by'), "dnsdist")
442 self.assertTrue("x-frame-options" in r.headers)
443
444
445 class TestAPIAuth(DNSDistTest):
446
447 _webTimeout = 2.0
448 _webServerPort = 8083
449 _webServerBasicAuthPassword = 'secret'
450 _webServerBasicAuthPasswordNew = 'password'
451 _webServerAPIKey = 'apisecret'
452 _webServerAPIKeyNew = 'apipassword'
453 # paths accessible using the API key only
454 _apiOnlyPath = '/api/v1/servers/localhost/config'
455 # paths accessible using basic auth only (list not exhaustive)
456 _basicOnlyPath = '/'
457 _consoleKey = DNSDistTest.generateConsoleKey()
458 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
459 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
460 _config_template = """
461 setKey("%s")
462 controlSocket("127.0.0.1:%s")
463 setACL({"127.0.0.1/32", "::1/128"})
464 newServer{address="127.0.0.1:%s"}
465 webserver("127.0.0.1:%s", "%s", "%s")
466 """
467
468 def testBasicAuthChange(self):
469 """
470 API: Basic Authentication updating credentials
471 """
472
473 url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
474 self.sendConsoleCommand('setWebserverConfig({{password="{}"}})'.format(self._webServerBasicAuthPasswordNew))
475
476 r = requests.get(url, auth=('whatever', self._webServerBasicAuthPasswordNew), timeout=self._webTimeout)
477 self.assertTrue(r)
478 self.assertEquals(r.status_code, 200)
479
480 # Make sure the old password is not usable any more
481 r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
482 self.assertEquals(r.status_code, 401)
483
484 def testXAPIKeyChange(self):
485 """
486 API: X-Api-Key updating credentials
487 """
488
489 url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath
490 self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew))
491
492 headers = {'x-api-key': self._webServerAPIKeyNew}
493 r = requests.get(url, headers=headers, timeout=self._webTimeout)
494 self.assertTrue(r)
495 self.assertEquals(r.status_code, 200)
496
497 # Make sure the old password is not usable any more
498 headers = {'x-api-key': self._webServerAPIKey}
499 r = requests.get(url, headers=headers, timeout=self._webTimeout)
500 self.assertEquals(r.status_code, 401)
501
502 def testBasicAuthOnlyChange(self):
503 """
504 API: X-Api-Key updated to none (disabled)
505 """
506
507 url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath
508 self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew))
509
510 headers = {'x-api-key': self._webServerAPIKeyNew}
511 r = requests.get(url, headers=headers, timeout=self._webTimeout)
512 self.assertTrue(r)
513 self.assertEquals(r.status_code, 200)
514
515 # now disable apiKey
516 self.sendConsoleCommand('setWebserverConfig({apiKey=""})')
517
518 r = requests.get(url, headers=headers, timeout=self._webTimeout)
519 self.assertEquals(r.status_code, 401)
520
521 class TestAPIACL(DNSDistTest):
522
523 _webTimeout = 2.0
524 _webServerPort = 8083
525 _webServerBasicAuthPassword = 'secret'
526 _webServerAPIKey = 'apisecret'
527 _consoleKey = DNSDistTest.generateConsoleKey()
528 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
529 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
530 _config_template = """
531 setKey("%s")
532 controlSocket("127.0.0.1:%s")
533 setACL({"127.0.0.1/32", "::1/128"})
534 newServer{address="127.0.0.1:%s"}
535 webserver("127.0.0.1:%s", "%s", "%s", {}, "192.0.2.1")
536 """
537
538 def testACLChange(self):
539 """
540 API: Should be denied by ACL then allowed
541 """
542
543 url = 'http://127.0.0.1:' + str(self._webServerPort) + "/"
544 try:
545 r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
546 self.assertTrue(False)
547 except requests.exceptions.ConnectionError as exp:
548 pass
549
550 # reset the ACL
551 self.sendConsoleCommand('setWebserverConfig({acl="127.0.0.1"})')
552
553 r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
554 self.assertTrue(r)
555 self.assertEquals(r.status_code, 200)