]>
Commit | Line | Data |
---|---|---|
c51c551e OM |
1 | import dns |
2 | import os | |
3 | from recursortests import RecursorTest | |
4 | ||
5 | class testSimpleTCP(RecursorTest): | |
6 | _confdir = 'SimpleTCP' | |
7 | ||
8 | _config_template = """dnssec=validate | |
9 | auth-zones=authzone.example=configs/%s/authzone.zone""" % _confdir | |
10 | ||
11 | @classmethod | |
12 | def generateRecursorConfig(cls, confdir): | |
13 | authzonepath = os.path.join(confdir, 'authzone.zone') | |
14 | with open(authzonepath, 'w') as authzone: | |
15 | authzone.write("""$ORIGIN authzone.example. | |
16 | @ 3600 IN SOA {soa} | |
17 | @ 3600 IN A 192.0.2.88 | |
18 | """.format(soa=cls._SOA)) | |
19 | super(testSimpleTCP, cls).generateRecursorConfig(confdir) | |
20 | ||
21 | def testSOAs(self): | |
22 | for zone in ['.', 'example.', 'secure.example.']: | |
23 | expected = dns.rrset.from_text(zone, 0, dns.rdataclass.IN, 'SOA', self._SOA) | |
24 | query = dns.message.make_query(zone, 'SOA', want_dnssec=True) | |
25 | query.flags |= dns.flags.AD | |
26 | ||
27 | res = self.sendTCPQuery(query) | |
28 | ||
29 | self.assertMessageIsAuthenticated(res) | |
30 | self.assertRRsetInAnswer(res, expected) | |
31 | self.assertMatchingRRSIGInAnswer(res, expected) | |
32 | ||
33 | def testA(self): | |
34 | expected = dns.rrset.from_text('ns.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.9'.format(prefix=self._PREFIX)) | |
35 | query = dns.message.make_query('ns.secure.example', 'A', want_dnssec=True) | |
36 | query.flags |= dns.flags.AD | |
37 | ||
38 | res = self.sendTCPQuery(query) | |
39 | ||
40 | self.assertMessageIsAuthenticated(res) | |
41 | self.assertRRsetInAnswer(res, expected) | |
42 | self.assertMatchingRRSIGInAnswer(res, expected) | |
43 | ||
44 | def testDelegation(self): | |
45 | query = dns.message.make_query('example', 'NS', want_dnssec=True) | |
46 | query.flags |= dns.flags.AD | |
47 | ||
48 | expectedNS = dns.rrset.from_text('example.', 0, 'IN', 'NS', 'ns1.example.', 'ns2.example.') | |
49 | ||
50 | res = self.sendTCPQuery(query) | |
51 | ||
52 | self.assertMessageIsAuthenticated(res) | |
53 | self.assertRRsetInAnswer(res, expectedNS) | |
54 | ||
55 | def testBogus(self): | |
56 | query = dns.message.make_query('ted.bogus.example', 'A', want_dnssec=True) | |
57 | ||
58 | res = self.sendTCPQuery(query) | |
59 | ||
60 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
61 | ||
62 | def testAuthZone(self): | |
63 | query = dns.message.make_query('authzone.example', 'A', want_dnssec=True) | |
64 | ||
65 | expectedA = dns.rrset.from_text('authzone.example.', 0, 'IN', 'A', '192.0.2.88') | |
66 | ||
67 | res = self.sendTCPQuery(query) | |
68 | ||
69 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
70 | self.assertRRsetInAnswer(res, expectedA) | |
71 | ||
72 | def testLocalhost(self): | |
73 | queryA = dns.message.make_query('localhost', 'A', want_dnssec=True) | |
74 | expectedA = dns.rrset.from_text('localhost.', 0, 'IN', 'A', '127.0.0.1') | |
75 | ||
76 | queryPTR = dns.message.make_query('1.0.0.127.in-addr.arpa', 'PTR', want_dnssec=True) | |
77 | expectedPTR = dns.rrset.from_text('1.0.0.127.in-addr.arpa.', 0, 'IN', 'PTR', 'localhost.') | |
78 | ||
79 | resA = self.sendTCPQuery(queryA) | |
80 | resPTR = self.sendTCPQuery(queryPTR) | |
81 | ||
82 | self.assertRcodeEqual(resA, dns.rcode.NOERROR) | |
83 | self.assertRRsetInAnswer(resA, expectedA) | |
84 | ||
85 | self.assertRcodeEqual(resPTR, dns.rcode.NOERROR) | |
86 | self.assertRRsetInAnswer(resPTR, expectedPTR) | |
87 | ||
e5f5fc6a PL |
88 | def testLocalhostSubdomain(self): |
89 | queryA = dns.message.make_query('foo.localhost', 'A', want_dnssec=True) | |
90 | expectedA = dns.rrset.from_text('foo.localhost.', 0, 'IN', 'A', '127.0.0.1') | |
91 | ||
92 | resA = self.sendTCPQuery(queryA) | |
93 | ||
94 | self.assertRcodeEqual(resA, dns.rcode.NOERROR) | |
95 | self.assertRRsetInAnswer(resA, expectedA) | |
96 | ||
c51c551e OM |
97 | def testIslandOfSecurity(self): |
98 | query = dns.message.make_query('cname-to-islandofsecurity.secure.example.', 'A', want_dnssec=True) | |
99 | ||
100 | expectedCNAME = dns.rrset.from_text('cname-to-islandofsecurity.secure.example.', 0, 'IN', 'CNAME', 'node1.islandofsecurity.example.') | |
101 | expectedA = dns.rrset.from_text('node1.islandofsecurity.example.', 0, 'IN', 'A', '192.0.2.20') | |
102 | ||
103 | res = self.sendTCPQuery(query) | |
104 | ||
105 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
106 | self.assertRRsetInAnswer(res, expectedA) | |
107 | ||
108 | ||
109 | def testVeryBasicPipeline(self): | |
110 | # This test does not enforce order, it will accept replies in any order. So | |
111 | # it does not actually test OOO behaviour. | |
112 | expected = {} | |
113 | queries = [] | |
114 | for zone in ['.', 'example.', 'secure.example.']: | |
115 | expected[zone] = dns.rrset.from_text(zone, 0, dns.rdataclass.IN, 'SOA', self._SOA) | |
116 | query = dns.message.make_query(zone, 'SOA', want_dnssec=True) | |
117 | query.flags |= dns.flags.AD | |
118 | queries.append(query) | |
119 | ||
120 | expected['ns.secure.example.'] = dns.rrset.from_text('ns.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.9'.format(prefix=self._PREFIX)) | |
121 | query = dns.message.make_query('ns.secure.example', 'A', want_dnssec=True) | |
122 | query.flags |= dns.flags.AD | |
123 | queries.append(query) | |
124 | ||
125 | ress = self.sendTCPQueries(queries) | |
126 | ||
127 | self.assertEqual(len(ress), len(expected)) | |
128 | ||
129 | for res in ress: | |
130 | exp = expected[res.question[0].name.to_text()] | |
131 | self.assertMessageIsAuthenticated(res) | |
132 | self.assertRRsetInAnswer(res, exp) | |
133 | self.assertMatchingRRSIGInAnswer(res, exp) | |
134 |