url = 'http://127.0.0.1:' + str(self._webServerPort) + path
r = requests.get(url, headers=headers, timeout=self._webTimeout)
self.assertEquals(r.status_code, 401)
+
def testBasicAuthOnly(self):
"""
API: Basic Authentication Only
def testServersIDontExist(self):
"""
- API: /api/v1/servers/idontexist (should be 404)
+ API: /api/v1/servers/idonotexist (should be 404)
"""
headers = {'x-api-key': self._webServerAPIKey}
- url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/idontexist'
+ url = 'http://127.0.0.1:' + str(self._webServerPort) + '/api/v1/servers/idonotexist'
r = requests.get(url, headers=headers, timeout=self._webTimeout)
self.assertEquals(r.status_code, 404)
'latency-slow', 'latency-sum', 'latency-count', 'latency-avg100', 'latency-avg1000',
'latency-avg10000', 'latency-avg1000000', 'uptime', 'real-memory-usage', 'noncompliant-queries',
'noncompliant-responses', 'rdqueries', 'empty-queries', 'cache-hits',
- 'cache-misses', 'cpu-user-msec', 'cpu-sys-msec', 'fd-usage', 'dyn-blocked',
- 'dyn-block-nmg-size', 'rule-servfail', 'security-status']
+ 'cache-misses', 'cpu-iowait', 'cpu-steal', 'cpu-sys-msec', 'cpu-user-msec', 'fd-usage', 'dyn-blocked',
+ 'dyn-block-nmg-size', 'rule-servfail', 'security-status',
+ 'udp-in-errors', 'udp-noport-errors', 'udp-recvbuf-errors', 'udp-sndbuf-errors',
+ 'doh-query-pipe-full', 'doh-response-pipe-full']
for key in expected:
self.assertIn(key, values)
r = requests.get(url, headers=headers, timeout=self._webTimeout)
self.assertEquals(r.status_code, 401)
+
+class TestAPIACL(DNSDistTest):
+
+ _webTimeout = 2.0
+ _webServerPort = 8083
+ _webServerBasicAuthPassword = 'secret'
+ _webServerAPIKey = 'apisecret'
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+ setACL({"127.0.0.1/32", "::1/128"})
+ newServer{address="127.0.0.1:%s"}
+ webserver("127.0.0.1:%s", "%s", "%s", {}, "192.0.2.1")
+ """
+
+ def testACLChange(self):
+ """
+ API: Should be denied by ACL then allowed
+ """
+
+ url = 'http://127.0.0.1:' + str(self._webServerPort) + "/"
+ try:
+ r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+ self.assertTrue(False)
+ except requests.exceptions.ConnectionError as exp:
+ pass
+
+ # reset the ACL
+ self.sendConsoleCommand('setWebserverConfig({acl="127.0.0.1"})')
+
+ r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+ self.assertTrue(r)
+ self.assertEquals(r.status_code, 200)