]>
Commit | Line | Data |
---|---|---|
fc89e57c RG |
1 | import dns |
2 | import requests | |
3 | import socket | |
4 | from recursortests import RecursorTest | |
5 | ||
6 | class RootNXTrustRecursorTest(RecursorTest): | |
7 | ||
8 | def getOutgoingQueriesCount(self): | |
9 | headers = {'x-api-key': self._apiKey} | |
10 | url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics' | |
11 | r = requests.get(url, headers=headers, timeout=self._wsTimeout) | |
12 | self.assertTrue(r) | |
13 | self.assertEquals(r.status_code, 200) | |
14 | self.assertTrue(r.json()) | |
15 | content = r.json() | |
16 | for entry in content: | |
17 | if entry['name'] == 'all-outqueries': | |
18 | return int(entry['value']) | |
19 | ||
20 | return 0 | |
21 | ||
22 | class testRootNXTrustDisabled(RootNXTrustRecursorTest): | |
23 | _confdir = 'RootNXTrustDisabled' | |
24 | _wsPort = 8042 | |
25 | _wsTimeout = 2 | |
26 | _wsPassword = 'secretpassword' | |
27 | _apiKey = 'secretapikey' | |
28 | ||
29 | _config_template = """ | |
30 | root-nx-trust=no | |
31 | webserver=yes | |
32 | webserver-port=%d | |
33 | webserver-address=127.0.0.1 | |
34 | webserver-password=%s | |
35 | api-key=%s | |
36 | """ % (_wsPort, _wsPassword, _apiKey) | |
37 | ||
38 | def testRootNXTrust(self): | |
39 | """ | |
40 | Check that, with root-nx-trust disabled, we still query the root for www2.nx-example. | |
41 | after receiving a NXD from "." for nx-example. as an answer for www.nx-example. | |
42 | """ | |
43 | ||
44 | # first query nx.example. | |
45 | before = self.getOutgoingQueriesCount() | |
46 | query = dns.message.make_query('www.nx-example.', 'A') | |
47 | res = self.sendUDPQuery(query) | |
48 | ||
49 | self.assertRcodeEqual(res, dns.rcode.NXDOMAIN) | |
50 | print(res) | |
51 | self.assertAuthorityHasSOA(res) | |
52 | ||
53 | # check that we sent one query to the root | |
54 | after = self.getOutgoingQueriesCount() | |
55 | self.assertEqual(after, before + 1) | |
56 | ||
57 | # then query nx2.example. | |
58 | before = after | |
59 | query = dns.message.make_query('www2.nx-example.', 'A') | |
60 | res = self.sendUDPQuery(query) | |
61 | ||
62 | self.assertRcodeEqual(res, dns.rcode.NXDOMAIN) | |
63 | self.assertAuthorityHasSOA(res) | |
64 | ||
65 | after = self.getOutgoingQueriesCount() | |
66 | self.assertEqual(after, before + 1) | |
67 | ||
68 | class testRootNXTrustEnabled(RootNXTrustRecursorTest): | |
69 | _confdir = 'RootNXTrustEnabled' | |
70 | _wsPort = 8042 | |
71 | _wsTimeout = 2 | |
72 | _wsPassword = 'secretpassword' | |
73 | _apiKey = 'secretapikey' | |
74 | ||
75 | _config_template = """ | |
76 | root-nx-trust=yes | |
77 | webserver=yes | |
78 | webserver-port=%d | |
79 | webserver-address=127.0.0.1 | |
80 | webserver-password=%s | |
81 | api-key=%s | |
82 | """ % (_wsPort, _wsPassword, _apiKey) | |
83 | ||
84 | def testRootNXTrust(self): | |
85 | """ | |
86 | Check that, with root-nx-trust enabled, we don't query the root for www2.nx-example. | |
87 | after receiving a NXD from "." for nx-example. as an answer for www.nx-example. | |
88 | """ | |
89 | ||
90 | # first query nx.example. | |
91 | before = self.getOutgoingQueriesCount() | |
92 | query = dns.message.make_query('www.nx-example.', 'A') | |
93 | res = self.sendUDPQuery(query) | |
94 | ||
95 | self.assertRcodeEqual(res, dns.rcode.NXDOMAIN) | |
96 | print(res) | |
97 | self.assertAuthorityHasSOA(res) | |
98 | ||
99 | # check that we sent one query to the root | |
100 | after = self.getOutgoingQueriesCount() | |
101 | self.assertEqual(after, before + 1) | |
102 | ||
103 | # then query nx2.example. | |
104 | before = after | |
105 | query = dns.message.make_query('www2.nx-example.', 'A') | |
106 | res = self.sendUDPQuery(query) | |
107 | ||
108 | self.assertRcodeEqual(res, dns.rcode.NXDOMAIN) | |
109 | self.assertAuthorityHasSOA(res) | |
110 | ||
111 | after = self.getOutgoingQueriesCount() | |
112 | self.assertEqual(after, before) |