]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_API.py
Merge pull request #6952 from phonedph1/root-nx
[thirdparty/pdns.git] / regression-tests.dnsdist / test_API.py
index 5d0483e10dfc9a14f35c43b02617679fc80c8fe7..64bbeeaa893d0b2f7d68b0bb87f041887687ed0c 100644 (file)
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 import os.path
 
+import base64
 import json
 import requests
 from dnsdisttests import DNSDistTest
@@ -30,6 +31,8 @@ class TestAPIBasics(DNSDistTest):
         """
         for path in self._basicOnlyPaths + self._statsPaths:
             url = 'http://127.0.0.1:' + str(self._webServerPort) + path
+            r = requests.get(url, auth=('whatever', "evilsecret"), timeout=self._webTimeout)
+            self.assertEquals(r.status_code, 401)
             r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
             self.assertTrue(r)
             self.assertEquals(r.status_code, 200)
@@ -45,6 +48,15 @@ class TestAPIBasics(DNSDistTest):
             self.assertTrue(r)
             self.assertEquals(r.status_code, 200)
 
+    def testWrongXAPIKey(self):
+        """
+        API: Wrong X-Api-Key
+        """
+        headers = {'x-api-key': "evilapikey"}
+        for path in self._apiOnlyPaths + self._statsPaths:
+            url = 'http://127.0.0.1:' + str(self._webServerPort) + path
+            r = requests.get(url, headers=headers, timeout=self._webTimeout)
+            self.assertEquals(r.status_code, 401)
     def testBasicAuthOnly(self):
         """
         API: Basic Authentication Only
@@ -78,21 +90,15 @@ class TestAPIBasics(DNSDistTest):
 
         self.assertEquals(content['daemon_type'], 'dnsdist')
 
-        rule_groups = ['response-rules', 'cache-hit-response-rules', 'self-answered-response-rules']
-        for key in ['version', 'acl', 'local', 'rules', 'servers', 'frontends', 'pools'] + rule_groups:
+        rule_groups = ['response-rules', 'cache-hit-response-rules', 'self-answered-response-rules', 'rules']
+        for key in ['version', 'acl', 'local', 'servers', 'frontends', 'pools'] + rule_groups:
             self.assertIn(key, content)
 
-        for rule in content['rules']:
-            for key in ['id', 'matches', 'rule', 'action', 'uuid']:
-                self.assertIn(key, rule)
-            for key in ['id', 'matches']:
-                self.assertTrue(rule[key] >= 0)
-
         for rule_group in rule_groups:
             for rule in content[rule_group]:
-                for key in ['id', 'matches', 'rule', 'action', 'uuid']:
+                for key in ['id', 'creationOrder', 'matches', 'rule', 'action', 'uuid']:
                     self.assertIn(key, rule)
-                for key in ['id', 'matches']:
+                for key in ['id', 'creationOrder', 'matches']:
                     self.assertTrue(rule[key] >= 0)
 
         for server in content['servers']:
@@ -178,7 +184,11 @@ class TestAPIBasics(DNSDistTest):
 
         self.assertEquals(content['name'], 'allow-from')
         self.assertEquals(content['type'], 'ConfigSetting')
-        self.assertEquals(content['value'], ["127.0.0.1/32", "::1/128"])
+        acl = content['value']
+        expectedACL = ["127.0.0.1/32", "::1/128"]
+        acl.sort()
+        expectedACL.sort()
+        self.assertEquals(acl, expectedACL)
 
     def testServersLocalhostConfigAllowFromPut(self):
         """
@@ -333,7 +343,11 @@ class TestAPIWritable(DNSDistTest):
         self.assertEquals(r.status_code, 200)
         self.assertTrue(r.json())
         content = r.json()
-        self.assertEquals(content['value'], ["127.0.0.1/32", "::1/128"])
+        acl = content['value']
+        expectedACL = ["127.0.0.1/32", "::1/128"]
+        acl.sort()
+        expectedACL.sort()
+        self.assertEquals(acl, expectedACL)
 
         newACL = ["192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"]
         payload = json.dumps({"name": "allow-from",
@@ -362,3 +376,127 @@ class TestAPIWritable(DNSDistTest):
         self.assertEquals(fileContent, """-- Generated by the REST API, DO NOT EDIT
 setACL({"192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"})
 """)
+
+class TestAPICustomHeaders(DNSDistTest):
+
+    _webTimeout = 2.0
+    _webServerPort = 8083
+    _webServerBasicAuthPassword = 'secret'
+    _webServerAPIKey = 'apisecret'
+    # paths accessible using the API key only
+    _apiOnlyPath = '/api/v1/servers/localhost/config'
+    # paths accessible using basic auth only (list not exhaustive)
+    _basicOnlyPath = '/'
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
+    _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+    setACL({"127.0.0.1/32", "::1/128"})
+    newServer({address="127.0.0.1:%s"})
+    webserver("127.0.0.1:%s", "%s", "%s", {["X-Frame-Options"]="", ["X-Custom"]="custom"})
+    """
+
+    def testBasicHeaders(self):
+        """
+        API: Basic custom headers
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
+
+        r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+        self.assertEquals(r.headers.get('x-custom'), "custom")
+        self.assertFalse("x-frame-options" in r.headers)
+
+    def testBasicHeadersUpdate(self):
+        """
+        API: Basic update of custom headers
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
+        self.sendConsoleCommand('setWebserverConfig({customHeaders={["x-powered-by"]="dnsdist"}})')
+        r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+        self.assertEquals(r.headers.get('x-powered-by'), "dnsdist")
+        self.assertTrue("x-frame-options" in r.headers)
+
+
+class TestAPIAuth(DNSDistTest):
+
+    _webTimeout = 2.0
+    _webServerPort = 8083
+    _webServerBasicAuthPassword = 'secret'
+    _webServerBasicAuthPasswordNew = 'password'
+    _webServerAPIKey = 'apisecret'
+    _webServerAPIKeyNew = 'apipassword'
+    # paths accessible using the API key only
+    _apiOnlyPath = '/api/v1/servers/localhost/config'
+    # paths accessible using basic auth only (list not exhaustive)
+    _basicOnlyPath = '/'
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
+    _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+    setACL({"127.0.0.1/32", "::1/128"})
+    newServer{address="127.0.0.1:%s"}
+    webserver("127.0.0.1:%s", "%s", "%s")
+    """
+
+    def testBasicAuthChange(self):
+        """
+        API: Basic Authentication updating credentials
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
+        self.sendConsoleCommand('setWebserverConfig({{password="{}"}})'.format(self._webServerBasicAuthPasswordNew))
+
+        r = requests.get(url, auth=('whatever', self._webServerBasicAuthPasswordNew), timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+
+        # Make sure the old password is not usable any more
+        r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+        self.assertEquals(r.status_code, 401)
+
+    def testXAPIKeyChange(self):
+        """
+        API: X-Api-Key updating credentials
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath
+        self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew))
+
+        headers = {'x-api-key': self._webServerAPIKeyNew}
+        r = requests.get(url, headers=headers, timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+
+        # Make sure the old password is not usable any more
+        headers = {'x-api-key': self._webServerAPIKey}
+        r = requests.get(url, headers=headers, timeout=self._webTimeout)
+        self.assertEquals(r.status_code, 401)
+
+    def testBasicAuthOnlyChange(self):
+        """
+        API: X-Api-Key updated to none (disabled)
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath
+        self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew))
+
+        headers = {'x-api-key': self._webServerAPIKeyNew}
+        r = requests.get(url, headers=headers, timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+
+        # now disable apiKey
+        self.sendConsoleCommand('setWebserverConfig({apiKey=""})')
+
+        r = requests.get(url, headers=headers, timeout=self._webTimeout)
+        self.assertEquals(r.status_code, 401)