]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.recursor-dnssec/test_RootNXTrust.py
Merge pull request #11431 from jroessler-ox/docs-kskzskroll-update
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_RootNXTrust.py
index afaf221fd125b966469dea3e1b62f36e7e46ce9b..cb1133641b3e8114101e74931a407c23fd6a929c 100644 (file)
@@ -1,6 +1,9 @@
 import dns
 import requests
 import socket
+import time
+import extendederrors
+
 from recursortests import RecursorTest
 
 class RootNXTrustRecursorTest(RecursorTest):
@@ -10,7 +13,7 @@ class RootNXTrustRecursorTest(RecursorTest):
         url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics'
         r = requests.get(url, headers=headers, timeout=self._wsTimeout)
         self.assertTrue(r)
-        self.assertEquals(r.status_code, 200)
+        self.assertEqual(r.status_code, 200)
         self.assertTrue(r.json())
         content = r.json()
         for entry in content:
@@ -19,6 +22,17 @@ class RootNXTrustRecursorTest(RecursorTest):
 
         return 0
 
+    # Recursor can still be busy resolving root hints, so wait a bit until
+    # getOutgoingQueriesCount() stabilizes.
+    # Code below is inherently racey, but better than a fixed sleep
+    def waitForOutgoingToStabilize(self):
+        for count in range(20):
+            outgoing1 = self.getOutgoingQueriesCount();
+            time.sleep(0.1);
+            outgoing2 = self.getOutgoingQueriesCount();
+            if outgoing1 == outgoing2:
+                break
+
 class testRootNXTrustDisabled(RootNXTrustRecursorTest):
     _confdir = 'RootNXTrustDisabled'
     _wsPort = 8042
@@ -34,6 +48,8 @@ webserver-port=%d
 webserver-address=127.0.0.1
 webserver-password=%s
 api-key=%s
+devonly-regression-test-mode
+extended-resolution-errors
 """ % (_wsPort, _wsPassword, _apiKey)
 
     def testRootNXTrust(self):
@@ -42,7 +58,9 @@ api-key=%s
         after receiving a NXD from "." for nx-example. as an answer for www.nx-example.
         """
 
-        # first query nx.example.
+        self.waitForTCPSocket("127.0.0.1", self._wsPort)
+        self.waitForOutgoingToStabilize()
+        # First query nx.example.
         before = self.getOutgoingQueriesCount()
         query = dns.message.make_query('www.nx-example.', 'A')
         res = self.sendUDPQuery(query)
@@ -57,7 +75,7 @@ api-key=%s
 
         # then query nx2.example.
         before = after
-        query = dns.message.make_query('www2.nx-example.', 'A')
+        query = dns.message.make_query('www2.nx-example.', 'A', use_edns=True)
         res = self.sendUDPQuery(query)
 
         self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
@@ -65,6 +83,8 @@ api-key=%s
 
         after = self.getOutgoingQueriesCount()
         self.assertEqual(after, before + 1)
+        self.assertEqual(res.edns, 0)
+        self.assertEqual(len(res.options), 0)
 
 class testRootNXTrustEnabled(RootNXTrustRecursorTest):
     _confdir = 'RootNXTrustEnabled'
@@ -80,6 +100,8 @@ webserver-port=%d
 webserver-address=127.0.0.1
 webserver-password=%s
 api-key=%s
+devonly-regression-test-mode
+extended-resolution-errors
 """ % (_wsPort, _wsPassword, _apiKey)
 
     def testRootNXTrust(self):
@@ -88,6 +110,8 @@ api-key=%s
         after receiving a NXD from "." for nx-example. as an answer for www.nx-example.
         """
 
+        self.waitForTCPSocket("127.0.0.1", self._wsPort)
+        self.waitForOutgoingToStabilize()
         # first query nx.example.
         before = self.getOutgoingQueriesCount()
         query = dns.message.make_query('www.nx-example.', 'A')
@@ -103,7 +127,7 @@ api-key=%s
 
         # then query nx2.example.
         before = after
-        query = dns.message.make_query('www2.nx-example.', 'A')
+        query = dns.message.make_query('www2.nx-example.', 'A', use_edns=True)
         res = self.sendUDPQuery(query)
 
         self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
@@ -111,3 +135,7 @@ api-key=%s
 
         after = self.getOutgoingQueriesCount()
         self.assertEqual(after, before)
+        self.assertEqual(res.edns, 0)
+        self.assertEqual(len(res.options), 1)
+        self.assertEqual(res.options[0].otype, 15)
+        self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized by root-nx-trust'))