]>
Commit | Line | Data |
---|---|---|
fc89e57c RG |
1 | import dns |
2 | import requests | |
3 | import socket | |
e7246d1e | 4 | import time |
811bddf9 OM |
5 | import extendederrors |
6 | ||
fc89e57c RG |
7 | from recursortests import RecursorTest |
8 | ||
9 | class RootNXTrustRecursorTest(RecursorTest): | |
10 | ||
11 | def getOutgoingQueriesCount(self): | |
12 | headers = {'x-api-key': self._apiKey} | |
13 | url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics' | |
14 | r = requests.get(url, headers=headers, timeout=self._wsTimeout) | |
15 | self.assertTrue(r) | |
4bfebc93 | 16 | self.assertEqual(r.status_code, 200) |
fc89e57c RG |
17 | self.assertTrue(r.json()) |
18 | content = r.json() | |
19 | for entry in content: | |
20 | if entry['name'] == 'all-outqueries': | |
21 | return int(entry['value']) | |
22 | ||
23 | return 0 | |
24 | ||
e7246d1e O |
25 | # Recursor can still be busy resolving root hints, so wait a bit until |
26 | # getOutgoingQueriesCount() stabilizes. | |
27 | # Code below is inherently racey, but better than a fixed sleep | |
28 | def waitForOutgoingToStabilize(self): | |
29 | for count in range(20): | |
30 | outgoing1 = self.getOutgoingQueriesCount(); | |
31 | time.sleep(0.1); | |
32 | outgoing2 = self.getOutgoingQueriesCount(); | |
33 | if outgoing1 == outgoing2: | |
34 | break | |
35 | ||
fc89e57c RG |
36 | class testRootNXTrustDisabled(RootNXTrustRecursorTest): |
37 | _confdir = 'RootNXTrustDisabled' | |
38 | _wsPort = 8042 | |
39 | _wsTimeout = 2 | |
40 | _wsPassword = 'secretpassword' | |
41 | _apiKey = 'secretapikey' | |
42 | ||
43 | _config_template = """ | |
44 | root-nx-trust=no | |
8949a3e0 | 45 | qname-minimization=no |
fc89e57c RG |
46 | webserver=yes |
47 | webserver-port=%d | |
48 | webserver-address=127.0.0.1 | |
49 | webserver-password=%s | |
50 | api-key=%s | |
7d3d2f4f | 51 | devonly-regression-test-mode |
811bddf9 | 52 | extended-resolution-errors |
fc89e57c RG |
53 | """ % (_wsPort, _wsPassword, _apiKey) |
54 | ||
55 | def testRootNXTrust(self): | |
56 | """ | |
57 | Check that, with root-nx-trust disabled, we still query the root for www2.nx-example. | |
58 | after receiving a NXD from "." for nx-example. as an answer for www.nx-example. | |
59 | """ | |
60 | ||
e6f9befc | 61 | self.waitForTCPSocket("127.0.0.1", self._wsPort) |
e7246d1e O |
62 | self.waitForOutgoingToStabilize() |
63 | # First query nx.example. | |
fc89e57c RG |
64 | before = self.getOutgoingQueriesCount() |
65 | query = dns.message.make_query('www.nx-example.', 'A') | |
66 | res = self.sendUDPQuery(query) | |
67 | ||
68 | self.assertRcodeEqual(res, dns.rcode.NXDOMAIN) | |
69 | print(res) | |
70 | self.assertAuthorityHasSOA(res) | |
71 | ||
72 | # check that we sent one query to the root | |
73 | after = self.getOutgoingQueriesCount() | |
74 | self.assertEqual(after, before + 1) | |
75 | ||
76 | # then query nx2.example. | |
77 | before = after | |
811bddf9 | 78 | query = dns.message.make_query('www2.nx-example.', 'A', use_edns=True) |
fc89e57c RG |
79 | res = self.sendUDPQuery(query) |
80 | ||
81 | self.assertRcodeEqual(res, dns.rcode.NXDOMAIN) | |
82 | self.assertAuthorityHasSOA(res) | |
83 | ||
84 | after = self.getOutgoingQueriesCount() | |
85 | self.assertEqual(after, before + 1) | |
811bddf9 OM |
86 | self.assertEqual(res.edns, 0) |
87 | self.assertEqual(len(res.options), 0) | |
fc89e57c RG |
88 | |
89 | class testRootNXTrustEnabled(RootNXTrustRecursorTest): | |
90 | _confdir = 'RootNXTrustEnabled' | |
91 | _wsPort = 8042 | |
92 | _wsTimeout = 2 | |
93 | _wsPassword = 'secretpassword' | |
94 | _apiKey = 'secretapikey' | |
95 | ||
96 | _config_template = """ | |
97 | root-nx-trust=yes | |
98 | webserver=yes | |
99 | webserver-port=%d | |
100 | webserver-address=127.0.0.1 | |
101 | webserver-password=%s | |
102 | api-key=%s | |
7d3d2f4f | 103 | devonly-regression-test-mode |
811bddf9 | 104 | extended-resolution-errors |
fc89e57c RG |
105 | """ % (_wsPort, _wsPassword, _apiKey) |
106 | ||
107 | def testRootNXTrust(self): | |
108 | """ | |
109 | Check that, with root-nx-trust enabled, we don't query the root for www2.nx-example. | |
110 | after receiving a NXD from "." for nx-example. as an answer for www.nx-example. | |
111 | """ | |
112 | ||
e6f9befc | 113 | self.waitForTCPSocket("127.0.0.1", self._wsPort) |
e7246d1e | 114 | self.waitForOutgoingToStabilize() |
fc89e57c RG |
115 | # first query nx.example. |
116 | before = self.getOutgoingQueriesCount() | |
117 | query = dns.message.make_query('www.nx-example.', 'A') | |
118 | res = self.sendUDPQuery(query) | |
119 | ||
120 | self.assertRcodeEqual(res, dns.rcode.NXDOMAIN) | |
121 | print(res) | |
122 | self.assertAuthorityHasSOA(res) | |
123 | ||
124 | # check that we sent one query to the root | |
125 | after = self.getOutgoingQueriesCount() | |
126 | self.assertEqual(after, before + 1) | |
127 | ||
128 | # then query nx2.example. | |
129 | before = after | |
811bddf9 | 130 | query = dns.message.make_query('www2.nx-example.', 'A', use_edns=True) |
fc89e57c RG |
131 | res = self.sendUDPQuery(query) |
132 | ||
133 | self.assertRcodeEqual(res, dns.rcode.NXDOMAIN) | |
134 | self.assertAuthorityHasSOA(res) | |
135 | ||
136 | after = self.getOutgoingQueriesCount() | |
137 | self.assertEqual(after, before) | |
811bddf9 OM |
138 | self.assertEqual(res.edns, 0) |
139 | self.assertEqual(len(res.options), 1) | |
140 | self.assertEqual(res.options[0].otype, 15) | |
7f8f0893 | 141 | self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized by root-nx-trust')) |