]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
auth: add autoprimary API test 11102/head
authorPeter van Dijk <peter.van.dijk@powerdns.com>
Wed, 15 Dec 2021 14:05:33 +0000 (15:05 +0100)
committerPeter van Dijk <peter.van.dijk@powerdns.com>
Mon, 10 Jan 2022 11:31:26 +0000 (12:31 +0100)
regression-tests.api/test_Servers.py

index 4547a59de906f1c3c8337757cab557f0d8389239..47122ebb159387b1976471d9601166646b74d7c7 100644 (file)
@@ -1,6 +1,8 @@
+import json
+import operator
 import requests
 import unittest
-from test_helper import ApiTestCase, is_auth, is_recursor
+from test_helper import ApiTestCase, is_auth, is_recursor, is_auth_lmdb
 
 
 class Servers(ApiTestCase):
@@ -102,3 +104,64 @@ class Servers(ApiTestCase):
         r = requests.get(self.url("/api/v1/servers/localhost/statistics"), auth=('admin', self.server_web_password))
         self.assertEqual(r.status_code, requests.codes.ok)
         self.assert_success_json(r)
+
+    @unittest.skipIf(is_recursor(), "Not applicable")
+    @unittest.skipIf(is_auth_lmdb(), "No autoprimary management in LMDB yet")
+    def test_autoprimaries(self):
+        # verify that we have zero autoprimaries
+        res = self.session.get(self.url("/api/v1/servers/localhost/autoprimaries"), auth=('whatever', self.webServerBasicAuthPassword), timeout=2.0)
+        self.assertEqual(res.status_code, requests.codes.ok)
+        self.assertEqual(res.json(), [])
+
+        # add one
+        payload = {
+            'ip': '192.0.2.1',
+            'nameserver': 'ns.example.com'
+        }
+
+        res = self.session.post(
+            self.url("/api/v1/servers/localhost/autoprimaries"),
+            data=json.dumps(payload),
+            headers={'content-type': 'application/json'})
+
+        self.assertEqual(res.status_code, 201)
+
+        # check that it's there
+        res = self.session.get(self.url("/api/v1/servers/localhost/autoprimaries"), auth=('whatever', self.webServerBasicAuthPassword), timeout=2.0)
+        self.assertEqual(res.status_code, requests.codes.ok)
+        self.assertEqual(res.json(), [{'account': '', 'ip': '192.0.2.1', 'nameserver': 'ns.example.com'}])
+
+        # add another one, this time with an account field
+        payload = {
+            'ip': '192.0.2.2',
+            'nameserver': 'ns.example.org',
+            'account': 'test'
+        }
+
+        res = self.session.post(
+            self.url("/api/v1/servers/localhost/autoprimaries"),
+            data=json.dumps(payload),
+            headers={'content-type': 'application/json'})
+
+        self.assertEqual(res.status_code, 201)
+
+        # check that both are there
+        res = self.session.get(self.url("/api/v1/servers/localhost/autoprimaries"), auth=('whatever', self.webServerBasicAuthPassword), timeout=2.0)
+        self.assertEqual(res.status_code, requests.codes.ok)
+        self.assertEqual(len(res.json()), 2)
+        self.assertEqual(sorted(res.json(), key=operator.itemgetter('ip')), [
+            {'account': '', 'ip': '192.0.2.1', 'nameserver': 'ns.example.com'},
+            {'account': 'test', 'ip': '192.0.2.2', 'nameserver': 'ns.example.org'}
+            ])
+
+        # remove one
+        res = self.session.delete(
+            self.url("/api/v1/servers/localhost/autoprimaries/192.0.2.2/ns.example.org"),
+            headers={'content-type': 'application/json"'})
+
+        self.assertEqual(res.status_code, 204)
+
+        # check that we are back to just one
+        res = self.session.get(self.url("/api/v1/servers/localhost/autoprimaries"), auth=('whatever', self.webServerBasicAuthPassword), timeout=2.0)
+        self.assertEqual(res.status_code, requests.codes.ok)
+        self.assertEqual(res.json(), [{'account': '', 'ip': '192.0.2.1', 'nameserver': 'ns.example.com'}])