]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_RootNXTrust.py
Merge pull request #13509 from rgacogne/ddist-teeaction-proxyprotocol
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_RootNXTrust.py
1 import dns
2 import requests
3 import socket
4 import time
5 import extendederrors
6
7 from recursortests import RecursorTest
8
9 class RootNXTrustRecursorTest(RecursorTest):
10
11 def getOutgoingQueriesCount(self):
12 headers = {'x-api-key': self._apiKey}
13 url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics'
14 r = requests.get(url, headers=headers, timeout=self._wsTimeout)
15 self.assertTrue(r)
16 self.assertEqual(r.status_code, 200)
17 self.assertTrue(r.json())
18 content = r.json()
19 for entry in content:
20 if entry['name'] == 'all-outqueries':
21 return int(entry['value'])
22
23 return 0
24
25 # Recursor can still be busy resolving root hints, so wait a bit until
26 # getOutgoingQueriesCount() stabilizes.
27 # Code below is inherently racey, but better than a fixed sleep
28 def waitForOutgoingToStabilize(self):
29 for count in range(20):
30 outgoing1 = self.getOutgoingQueriesCount();
31 time.sleep(0.1);
32 outgoing2 = self.getOutgoingQueriesCount();
33 if outgoing1 == outgoing2:
34 break
35
36 class testRootNXTrustDisabled(RootNXTrustRecursorTest):
37 _confdir = 'RootNXTrustDisabled'
38 _wsPort = 8042
39 _wsTimeout = 2
40 _wsPassword = 'secretpassword'
41 _apiKey = 'secretapikey'
42
43 _config_template = """
44 root-nx-trust=no
45 qname-minimization=no
46 webserver=yes
47 webserver-port=%d
48 webserver-address=127.0.0.1
49 webserver-password=%s
50 api-key=%s
51 devonly-regression-test-mode
52 extended-resolution-errors
53 """ % (_wsPort, _wsPassword, _apiKey)
54
55 def testRootNXTrust(self):
56 """
57 Check that, with root-nx-trust disabled, we still query the root for www2.nx-example.
58 after receiving a NXD from "." for nx-example. as an answer for www.nx-example.
59 """
60
61 self.waitForTCPSocket("127.0.0.1", self._wsPort)
62 self.waitForOutgoingToStabilize()
63 # First query nx.example.
64 before = self.getOutgoingQueriesCount()
65 query = dns.message.make_query('www.nx-example.', 'A')
66 res = self.sendUDPQuery(query)
67
68 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
69 print(res)
70 self.assertAuthorityHasSOA(res)
71
72 # check that we sent one query to the root
73 after = self.getOutgoingQueriesCount()
74 self.assertEqual(after, before + 1)
75
76 # then query nx2.example.
77 before = after
78 query = dns.message.make_query('www2.nx-example.', 'A', use_edns=True)
79 res = self.sendUDPQuery(query)
80
81 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
82 self.assertAuthorityHasSOA(res)
83
84 after = self.getOutgoingQueriesCount()
85 self.assertEqual(after, before + 1)
86 self.assertEqual(res.edns, 0)
87 self.assertEqual(len(res.options), 0)
88
89 class testRootNXTrustEnabled(RootNXTrustRecursorTest):
90 _confdir = 'RootNXTrustEnabled'
91 _wsPort = 8042
92 _wsTimeout = 2
93 _wsPassword = 'secretpassword'
94 _apiKey = 'secretapikey'
95
96 _config_template = """
97 root-nx-trust=yes
98 webserver=yes
99 webserver-port=%d
100 webserver-address=127.0.0.1
101 webserver-password=%s
102 api-key=%s
103 devonly-regression-test-mode
104 extended-resolution-errors
105 """ % (_wsPort, _wsPassword, _apiKey)
106
107 def testRootNXTrust(self):
108 """
109 Check that, with root-nx-trust enabled, we don't query the root for www2.nx-example.
110 after receiving a NXD from "." for nx-example. as an answer for www.nx-example.
111 """
112
113 self.waitForTCPSocket("127.0.0.1", self._wsPort)
114 self.waitForOutgoingToStabilize()
115 # first query nx.example.
116 before = self.getOutgoingQueriesCount()
117 query = dns.message.make_query('www.nx-example.', 'A')
118 res = self.sendUDPQuery(query)
119
120 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
121 print(res)
122 self.assertAuthorityHasSOA(res)
123
124 # check that we sent one query to the root
125 after = self.getOutgoingQueriesCount()
126 self.assertEqual(after, before + 1)
127
128 # then query nx2.example.
129 before = after
130 query = dns.message.make_query('www2.nx-example.', 'A', use_edns=True)
131 res = self.sendUDPQuery(query)
132
133 self.assertRcodeEqual(res, dns.rcode.NXDOMAIN)
134 self.assertAuthorityHasSOA(res)
135
136 after = self.getOutgoingQueriesCount()
137 self.assertEqual(after, before)
138 self.assertEqual(res.edns, 0)
139 self.assertEqual(len(res.options), 1)
140 self.assertEqual(res.options[0].otype, 15)
141 self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized by root-nx-trust'))