]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_API.py
Merge pull request #6952 from phonedph1/root-nx
[thirdparty/pdns.git] / regression-tests.dnsdist / test_API.py
index 3d1e802b3e26fd4468a87f03709983b6cfa6f824..64bbeeaa893d0b2f7d68b0bb87f041887687ed0c 100644 (file)
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 import os.path
 
+import base64
 import json
 import requests
 from dnsdisttests import DNSDistTest
@@ -30,6 +31,8 @@ class TestAPIBasics(DNSDistTest):
         """
         for path in self._basicOnlyPaths + self._statsPaths:
             url = 'http://127.0.0.1:' + str(self._webServerPort) + path
+            r = requests.get(url, auth=('whatever', "evilsecret"), timeout=self._webTimeout)
+            self.assertEquals(r.status_code, 401)
             r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
             self.assertTrue(r)
             self.assertEquals(r.status_code, 200)
@@ -45,6 +48,15 @@ class TestAPIBasics(DNSDistTest):
             self.assertTrue(r)
             self.assertEquals(r.status_code, 200)
 
+    def testWrongXAPIKey(self):
+        """
+        API: Wrong X-Api-Key
+        """
+        headers = {'x-api-key': "evilapikey"}
+        for path in self._apiOnlyPaths + self._statsPaths:
+            url = 'http://127.0.0.1:' + str(self._webServerPort) + path
+            r = requests.get(url, headers=headers, timeout=self._webTimeout)
+            self.assertEquals(r.status_code, 401)
     def testBasicAuthOnly(self):
         """
         API: Basic Authentication Only
@@ -78,30 +90,21 @@ class TestAPIBasics(DNSDistTest):
 
         self.assertEquals(content['daemon_type'], 'dnsdist')
 
-        for key in ['version', 'acl', 'local', 'rules', 'response-rules', 'cache-hit-response-rules', 'servers', 'frontends', 'pools']:
+        rule_groups = ['response-rules', 'cache-hit-response-rules', 'self-answered-response-rules', 'rules']
+        for key in ['version', 'acl', 'local', 'servers', 'frontends', 'pools'] + rule_groups:
             self.assertIn(key, content)
 
-        for rule in content['rules']:
-            for key in ['id', 'matches', 'rule', 'action', 'uuid']:
-                self.assertIn(key, rule)
-            for key in ['id', 'matches']:
-                self.assertTrue(rule[key] >= 0)
-
-        for rule in content['response-rules']:
-            for key in ['id', 'matches', 'rule', 'action', 'uuid']:
-                self.assertIn(key, rule)
-            for key in ['id', 'matches']:
-                self.assertTrue(rule[key] >= 0)
-
-        for rule in content['cache-hit-response-rules']:
-            for key in ['id', 'matches', 'rule', 'action']:
-                self.assertIn(key, rule)
-            for key in ['id', 'matches']:
-                self.assertTrue(rule[key] >= 0)
+        for rule_group in rule_groups:
+            for rule in content[rule_group]:
+                for key in ['id', 'creationOrder', 'matches', 'rule', 'action', 'uuid']:
+                    self.assertIn(key, rule)
+                for key in ['id', 'creationOrder', 'matches']:
+                    self.assertTrue(rule[key] >= 0)
 
         for server in content['servers']:
             for key in ['id', 'latency', 'name', 'weight', 'outstanding', 'qpsLimit',
-                        'reuseds', 'state', 'address', 'pools', 'qps', 'queries', 'order', 'sendErrors']:
+                        'reuseds', 'state', 'address', 'pools', 'qps', 'queries', 'order', 'sendErrors',
+                        'dropRate']:
                 self.assertIn(key, server)
 
             for key in ['id', 'latency', 'weight', 'outstanding', 'qpsLimit', 'reuseds',
@@ -181,7 +184,11 @@ class TestAPIBasics(DNSDistTest):
 
         self.assertEquals(content['name'], 'allow-from')
         self.assertEquals(content['type'], 'ConfigSetting')
-        self.assertEquals(content['value'], ["127.0.0.1/32", "::1/128"])
+        acl = content['value']
+        expectedACL = ["127.0.0.1/32", "::1/128"]
+        acl.sort()
+        expectedACL.sort()
+        self.assertEquals(acl, expectedACL)
 
     def testServersLocalhostConfigAllowFromPut(self):
         """
@@ -226,7 +233,7 @@ class TestAPIBasics(DNSDistTest):
                     'latency-avg1000000', 'uptime', 'real-memory-usage', 'noncompliant-queries',
                     'noncompliant-responses', 'rdqueries', 'empty-queries', 'cache-hits',
                     'cache-misses', 'cpu-user-msec', 'cpu-sys-msec', 'fd-usage', 'dyn-blocked',
-                    'dyn-block-nmg-size']
+                    'dyn-block-nmg-size', 'rule-servfail']
 
         for key in expected:
             self.assertIn(key, values)
@@ -275,7 +282,7 @@ class TestAPIBasics(DNSDistTest):
         content = r.json()
 
         if content:
-            for key in ['reason', 'seconds', 'blocks']:
+            for key in ['reason', 'seconds', 'blocks', 'action']:
                 self.assertIn(key, content)
 
             for key in ['blocks']:
@@ -336,7 +343,11 @@ class TestAPIWritable(DNSDistTest):
         self.assertEquals(r.status_code, 200)
         self.assertTrue(r.json())
         content = r.json()
-        self.assertEquals(content['value'], ["127.0.0.1/32", "::1/128"])
+        acl = content['value']
+        expectedACL = ["127.0.0.1/32", "::1/128"]
+        acl.sort()
+        expectedACL.sort()
+        self.assertEquals(acl, expectedACL)
 
         newACL = ["192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"]
         payload = json.dumps({"name": "allow-from",
@@ -359,9 +370,133 @@ class TestAPIWritable(DNSDistTest):
         configFile = self._APIWriteDir + '/' + 'acl.conf'
         self.assertTrue(os.path.isfile(configFile))
         fileContent = None
-        with file(configFile) as f:
+        with open(configFile, 'rt') as f:
             fileContent = f.read()
 
         self.assertEquals(fileContent, """-- Generated by the REST API, DO NOT EDIT
 setACL({"192.0.2.0/24", "198.51.100.0/24", "203.0.113.0/24"})
 """)
+
+class TestAPICustomHeaders(DNSDistTest):
+
+    _webTimeout = 2.0
+    _webServerPort = 8083
+    _webServerBasicAuthPassword = 'secret'
+    _webServerAPIKey = 'apisecret'
+    # paths accessible using the API key only
+    _apiOnlyPath = '/api/v1/servers/localhost/config'
+    # paths accessible using basic auth only (list not exhaustive)
+    _basicOnlyPath = '/'
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
+    _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+    setACL({"127.0.0.1/32", "::1/128"})
+    newServer({address="127.0.0.1:%s"})
+    webserver("127.0.0.1:%s", "%s", "%s", {["X-Frame-Options"]="", ["X-Custom"]="custom"})
+    """
+
+    def testBasicHeaders(self):
+        """
+        API: Basic custom headers
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
+
+        r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+        self.assertEquals(r.headers.get('x-custom'), "custom")
+        self.assertFalse("x-frame-options" in r.headers)
+
+    def testBasicHeadersUpdate(self):
+        """
+        API: Basic update of custom headers
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
+        self.sendConsoleCommand('setWebserverConfig({customHeaders={["x-powered-by"]="dnsdist"}})')
+        r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+        self.assertEquals(r.headers.get('x-powered-by'), "dnsdist")
+        self.assertTrue("x-frame-options" in r.headers)
+
+
+class TestAPIAuth(DNSDistTest):
+
+    _webTimeout = 2.0
+    _webServerPort = 8083
+    _webServerBasicAuthPassword = 'secret'
+    _webServerBasicAuthPasswordNew = 'password'
+    _webServerAPIKey = 'apisecret'
+    _webServerAPIKeyNew = 'apipassword'
+    # paths accessible using the API key only
+    _apiOnlyPath = '/api/v1/servers/localhost/config'
+    # paths accessible using basic auth only (list not exhaustive)
+    _basicOnlyPath = '/'
+    _consoleKey = DNSDistTest.generateConsoleKey()
+    _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+    _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_webServerPort', '_webServerBasicAuthPassword', '_webServerAPIKey']
+    _config_template = """
+    setKey("%s")
+    controlSocket("127.0.0.1:%s")
+    setACL({"127.0.0.1/32", "::1/128"})
+    newServer{address="127.0.0.1:%s"}
+    webserver("127.0.0.1:%s", "%s", "%s")
+    """
+
+    def testBasicAuthChange(self):
+        """
+        API: Basic Authentication updating credentials
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._basicOnlyPath
+        self.sendConsoleCommand('setWebserverConfig({{password="{}"}})'.format(self._webServerBasicAuthPasswordNew))
+
+        r = requests.get(url, auth=('whatever', self._webServerBasicAuthPasswordNew), timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+
+        # Make sure the old password is not usable any more
+        r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+        self.assertEquals(r.status_code, 401)
+
+    def testXAPIKeyChange(self):
+        """
+        API: X-Api-Key updating credentials
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath
+        self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew))
+
+        headers = {'x-api-key': self._webServerAPIKeyNew}
+        r = requests.get(url, headers=headers, timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+
+        # Make sure the old password is not usable any more
+        headers = {'x-api-key': self._webServerAPIKey}
+        r = requests.get(url, headers=headers, timeout=self._webTimeout)
+        self.assertEquals(r.status_code, 401)
+
+    def testBasicAuthOnlyChange(self):
+        """
+        API: X-Api-Key updated to none (disabled)
+        """
+
+        url = 'http://127.0.0.1:' + str(self._webServerPort) + self._apiOnlyPath
+        self.sendConsoleCommand('setWebserverConfig({{apiKey="{}"}})'.format(self._webServerAPIKeyNew))
+
+        headers = {'x-api-key': self._webServerAPIKeyNew}
+        r = requests.get(url, headers=headers, timeout=self._webTimeout)
+        self.assertTrue(r)
+        self.assertEquals(r.status_code, 200)
+
+        # now disable apiKey
+        self.sendConsoleCommand('setWebserverConfig({apiKey=""})')
+
+        r = requests.get(url, headers=headers, timeout=self._webTimeout)
+        self.assertEquals(r.status_code, 401)