#!/usr/bin/env python
import os.path
+import base64
import json
import requests
from dnsdisttests import DNSDistTest
"""
for path in self._basicOnlyPaths + self._statsPaths:
url = 'http://127.0.0.1:' + str(self._webServerPort) + path
+ r = requests.get(url, auth=('whatever', "evilsecret"), timeout=self._webTimeout)
+ self.assertEquals(r.status_code, 401)
r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
self.assertTrue(r)
self.assertEquals(r.status_code, 200)
self.assertTrue(r)
self.assertEquals(r.status_code, 200)
+ def testWrongXAPIKey(self):
+ """
+ API: Wrong X-Api-Key
+ """
+ headers = {'x-api-key': "evilapikey"}
+ for path in self._apiOnlyPaths + self._statsPaths:
+ url = 'http://127.0.0.1:' + str(self._webServerPort) + path
+ r = requests.get(url, headers=headers, timeout=self._webTimeout)
+ self.assertEquals(r.status_code, 401)
def testBasicAuthOnly(self):
"""
API: Basic Authentication Only
self.assertEquals(content['daemon_type'], 'dnsdist')
- rule_groups = ['response-rules', 'cache-hit-response-rules', 'self-answered-response-rules']
- for key in ['version', 'acl', 'local', 'rules', 'servers', 'frontends', 'pools'] + rule_groups:
+ rule_groups = ['response-rules', 'cache-hit-response-rules', 'self-answered-response-rules', 'rules']
+ for key in ['version', 'acl', 'local', 'servers', 'frontends', 'pools'] + rule_groups:
self.assertIn(key, content)
- for rule in content['rules']:
- for key in ['id', 'matches', 'rule', 'action', 'uuid']:
- self.assertIn(key, rule)
- for key in ['id', 'matches']:
- self.assertTrue(rule[key] >= 0)
-
for rule_group in rule_groups:
for rule in content[rule_group]:
- for key in ['id', 'matches', 'rule', 'action', 'uuid']:
+ for key in ['id', 'creationOrder', 'matches', 'rule', 'action', 'uuid']:
self.assertIn(key, rule)
- for key in ['id', 'matches']:
+ for key in ['id', 'creationOrder', 'matches']:
self.assertTrue(rule[key] >= 0)
for server in content['servers']:
for key in ['id', 'latency', 'name', 'weight', 'outstanding', 'qpsLimit',
- 'reuseds', 'state', 'address', 'pools', 'qps', 'queries', 'order', 'sendErrors']:
+ 'reuseds', 'state', 'address', 'pools', 'qps', 'queries', 'order', 'sendErrors',
+ 'dropRate']:
self.assertIn(key, server)
for key in ['id', 'latency', 'weight', 'outstanding', 'qpsLimit', 'reuseds',
self.assertEquals(content['name'], 'allow-from')
self.assertEquals(content['type'], 'ConfigSetting')
- self.assertEquals(content['value'], ["127.0.0.1/32", "::1/128"])
+ acl = content['value']
+ expectedACL = ["127.0.0.1/32", "::1/128"]
+ acl.sort()
+ expectedACL.sort()
+ self.assertEquals(acl, expectedACL)
def testServersLocalhostConfigAllowFromPut(self):
"""
content = r.json()
if content:
- for key in ['reason', 'seconds', 'blocks']:
+ for key in ['reason', 'seconds', 'blocks', 'action']:
self.assertIn(key, content)
for key in ['blocks']:
self.assertEquals(r.status_code, 200)
self.assertTrue(r.json())
content = r.json()
- self.assertEquals(content['value'], ["127.0.0.1/32", "::1/128"])
+ acl = content['value']
+ expectedACL = ["127.0.0.1/32", "::1/128"]
+ acl.sort()
+ expectedACL.sort()
+ self.assertEquals(acl, expectedACL)
newACL = ["192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"]
payload = json.dumps({"name": "allow-from",
self.assertEquals(fileContent, """-- Generated by the REST API, DO NOT EDIT
setACL({"192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"})
""")
+
+class TestAPICustomHeaders(DNSDistTest):
+
+ _webTimeout = 2.0
+ _webServerPort = 8083
+ _webServerBasicAuthPassword = 'secret'
+ _webServerAPIKey = 'apisecret'
+ # paths accessible using the API key only
+ _apiOnlyPath = '/api/v1/servers/localhost/config'
+ # paths accessible using basic auth only (list not exhaustive)
+ _basicOnlyPath = '/'
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+ setACL({"127.0.0.1/32", "::1/128"})
+ newServer({address="127.0.0.1:%s"})
+ webserver("127.0.0.1:%s", "%s", "%s", {["X-Frame-Options"]="", ["X-Custom"]="custom"})
+ """
+
+ def testBasicHeaders(self):
+ """
+ API: Basic custom headers
+ """
+
+ url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
+
+ r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+ self.assertTrue(r)
+ self.assertEquals(r.status_code, 200)
+ self.assertEquals(r.headers.get('x-custom'), "custom")
+ self.assertFalse("x-frame-options" in r.headers)
+
+ def testBasicHeadersUpdate(self):
+ """
+ API: Basic update of custom headers
+ """
+
+ url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
+ self.sendConsoleCommand('setWebserverConfig({customHeaders={["x-powered-by"]="dnsdist"}})')
+ r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+ self.assertTrue(r)
+ self.assertEquals(r.status_code, 200)
+ self.assertEquals(r.headers.get('x-powered-by'), "dnsdist")
+ self.assertTrue("x-frame-options" in r.headers)
+
+
+class TestAPIAuth(DNSDistTest):
+
+ _webTimeout = 2.0
+ _webServerPort = 8083
+ _webServerBasicAuthPassword = 'secret'
+ _webServerBasicAuthPasswordNew = 'password'
+ _webServerAPIKey = 'apisecret'
+ _webServerAPIKeyNew = 'apipassword'
+ # paths accessible using the API key only
+ _apiOnlyPath = '/api/v1/servers/localhost/config'
+ # paths accessible using basic auth only (list not exhaustive)
+ _basicOnlyPath = '/'
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+ setACL({"127.0.0.1/32", "::1/128"})
+ newServer{address="127.0.0.1:%s"}
+ webserver("127.0.0.1:%s", "%s", "%s")
+ """
+
+ def testBasicAuthChange(self):
+ """
+ API: Basic Authentication updating credentials
+ """
+
+ url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
+ self.sendConsoleCommand('setWebserverConfig({{password="{}"}})'.format(self._webServerBasicAuthPasswordNew))
+
+ r = requests.get(url, auth=('whatever', self._webServerBasicAuthPasswordNew), timeout=self._webTimeout)
+ self.assertTrue(r)
+ self.assertEquals(r.status_code, 200)
+
+ # Make sure the old password is not usable any more
+ r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+ self.assertEquals(r.status_code, 401)
+
+ def testXAPIKeyChange(self):
+ """
+ API: X-Api-Key updating credentials
+ """
+
+ url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath
+ self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew))
+
+ headers = {'x-api-key': self._webServerAPIKeyNew}
+ r = requests.get(url, headers=headers, timeout=self._webTimeout)
+ self.assertTrue(r)
+ self.assertEquals(r.status_code, 200)
+
+ # Make sure the old password is not usable any more
+ headers = {'x-api-key': self._webServerAPIKey}
+ r = requests.get(url, headers=headers, timeout=self._webTimeout)
+ self.assertEquals(r.status_code, 401)
+
+ def testBasicAuthOnlyChange(self):
+ """
+ API: X-Api-Key updated to none (disabled)
+ """
+
+ url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath
+ self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew))
+
+ headers = {'x-api-key': self._webServerAPIKeyNew}
+ r = requests.get(url, headers=headers, timeout=self._webTimeout)
+ self.assertTrue(r)
+ self.assertEquals(r.status_code, 200)
+
+ # now disable apiKey
+ self.sendConsoleCommand('setWebserverConfig({apiKey=""})')
+
+ r = requests.get(url, headers=headers, timeout=self._webTimeout)
+ self.assertEquals(r.status_code, 401)