]>
Commit | Line | Data |
---|---|---|
838adb89 PL |
1 | #!/usr/bin/env python |
2 | ||
3 | from __future__ import print_function | |
4 | ||
5 | import threading | |
6 | import unittest | |
7 | ||
8 | import dns | |
9 | from twisted.internet.protocol import DatagramProtocol | |
10 | from twisted.internet import reactor | |
11 | ||
12 | from authtests import AuthTest | |
13 | ||
14 | aliasUDPReactorRunning = False | |
15 | ||
16 | ||
17 | class TestALIAS(AuthTest): | |
18 | _zones = { | |
19 | 'example.org': """ | |
20 | example.org. 3600 IN SOA {soa} | |
21 | example.org. 3600 IN NS ns1.example.org. | |
22 | example.org. 3600 IN NS ns2.example.org. | |
23 | ns1.example.org. 3600 IN A {prefix}.10 | |
24 | ns2.example.org. 3600 IN A {prefix}.11 | |
25 | ||
26 | noerror.example.org. 3600 IN ALIAS noerror.example.com. | |
27 | nxd.example.org. 3600 IN ALIAS nxd.example.com. | |
28 | servfail.example.org. 3600 IN ALIAS servfail.example.com | |
29 | """, | |
30 | } | |
31 | ||
32 | @classmethod | |
33 | def startResponders(cls): | |
34 | global aliasUDPReactorRunning | |
35 | ||
36 | address = cls._PREFIX + '.1' | |
37 | port = 5301 | |
38 | ||
39 | if not aliasUDPReactorRunning: | |
40 | reactor.listenUDP(port, AliasUDPResponder(), interface=address) | |
41 | ||
42 | aliasUDPReactorRunning = True | |
43 | ||
44 | if not reactor.running: | |
45 | cls._ALIASResponder = threading.Thread(name='ALIAS Responder', | |
46 | target=reactor.run, | |
47 | args=(False,)) | |
48 | cls._ALIASResponder.setDaemon(True) | |
49 | cls._ALIASResponder.start() | |
50 | ||
51 | def testNoError(self): | |
52 | expected_a = [dns.rrset.from_text('noerror.example.org.', | |
53 | 0, dns.rdataclass.IN, 'A', | |
54 | '192.0.2.1')] | |
55 | expected_aaaa = [dns.rrset.from_text('noerror.example.org.', | |
56 | 0, dns.rdataclass.IN, 'AAAA', | |
57 | '2001:DB8::1')] | |
58 | ||
59 | query = dns.message.make_query('noerror.example.org', 'A') | |
60 | res = self.sendUDPQuery(query) | |
61 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
62 | self.assertAnyRRsetInAnswer(res, expected_a) | |
63 | ||
64 | query = dns.message.make_query('noerror.example.org', 'AAAA') | |
65 | res = self.sendUDPQuery(query) | |
66 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
67 | self.assertAnyRRsetInAnswer(res, expected_aaaa) | |
68 | ||
69 | query = dns.message.make_query('noerror.example.org', 'ANY') | |
70 | res = self.sendUDPQuery(query) | |
71 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
72 | self.assertAnyRRsetInAnswer(res, expected_a) | |
73 | self.assertAnyRRsetInAnswer(res, expected_aaaa) | |
74 | ||
75 | # NODATA | |
76 | query = dns.message.make_query('noerror.example.org', 'MX') | |
77 | res = self.sendUDPQuery(query) | |
78 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
79 | self.assertEqual(len(res.answer), 0) | |
80 | ||
81 | def testNxDomain(self): | |
82 | query = dns.message.make_query('nxd.example.org', 'A') | |
83 | res = self.sendUDPQuery(query) | |
84 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
85 | ||
86 | query = dns.message.make_query('nxd.example.org', 'AAAA') | |
87 | res = self.sendUDPQuery(query) | |
88 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
89 | ||
90 | # TODO this should actually return SOA + NS? | |
91 | query = dns.message.make_query('nxd.example.org', 'ANY') | |
92 | res = self.sendUDPQuery(query) | |
93 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
94 | ||
95 | def testServFail(self): | |
96 | query = dns.message.make_query('servfail.example.org', 'A') | |
97 | res = self.sendUDPQuery(query) | |
98 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
99 | ||
100 | query = dns.message.make_query('servfail.example.org', 'AAAA') | |
101 | res = self.sendUDPQuery(query) | |
102 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
103 | ||
104 | # TODO this should actually return SOA + NS? | |
105 | query = dns.message.make_query('servfail.example.org', 'ANY') | |
106 | res = self.sendUDPQuery(query) | |
107 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
108 | ||
109 | def testNoErrorTCP(self): | |
110 | expected_a = [dns.rrset.from_text('noerror.example.org.', | |
111 | 0, dns.rdataclass.IN, 'A', | |
112 | '192.0.2.1')] | |
113 | expected_aaaa = [dns.rrset.from_text('noerror.example.org.', | |
114 | 0, dns.rdataclass.IN, 'AAAA', | |
115 | '2001:DB8::1')] | |
116 | ||
117 | query = dns.message.make_query('noerror.example.org', 'A') | |
118 | res = self.sendTCPQuery(query) | |
119 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
120 | self.assertAnyRRsetInAnswer(res, expected_a) | |
121 | ||
122 | query = dns.message.make_query('noerror.example.org', 'AAAA') | |
123 | res = self.sendTCPQuery(query) | |
124 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
125 | self.assertAnyRRsetInAnswer(res, expected_aaaa) | |
126 | ||
127 | query = dns.message.make_query('noerror.example.org', 'ANY') | |
128 | res = self.sendTCPQuery(query) | |
129 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
130 | self.assertAnyRRsetInAnswer(res, expected_a) | |
131 | self.assertAnyRRsetInAnswer(res, expected_aaaa) | |
132 | ||
133 | # NODATA | |
134 | query = dns.message.make_query('noerror.example.org', 'MX') | |
135 | res = self.sendTCPQuery(query) | |
136 | self.assertRcodeEqual(res, dns.rcode.NOERROR) | |
137 | self.assertEqual(len(res.answer), 0) | |
138 | ||
139 | def testNxDomainTCP(self): | |
140 | query = dns.message.make_query('nxd.example.org', 'A') | |
141 | res = self.sendTCPQuery(query) | |
142 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
143 | ||
144 | query = dns.message.make_query('nxd.example.org', 'AAAA') | |
145 | res = self.sendTCPQuery(query) | |
146 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
147 | ||
148 | query = dns.message.make_query('nxd.example.org', 'ANY') | |
149 | res = self.sendTCPQuery(query) | |
150 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
151 | ||
152 | def testServFailTCP(self): | |
153 | query = dns.message.make_query('servfail.example.org', 'A') | |
154 | res = self.sendTCPQuery(query) | |
155 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
156 | ||
157 | query = dns.message.make_query('servfail.example.org', 'AAAA') | |
158 | res = self.sendTCPQuery(query) | |
159 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
160 | ||
161 | query = dns.message.make_query('servfail.example.org', 'ANY') | |
162 | res = self.sendTCPQuery(query) | |
163 | self.assertRcodeEqual(res, dns.rcode.SERVFAIL) | |
164 | ||
165 | ||
166 | class AliasUDPResponder(DatagramProtocol): | |
167 | def datagramReceived(self, datagram, address): | |
168 | request = dns.message.from_wire(datagram) | |
169 | response = dns.message.make_response(request) | |
170 | response.use_edns(edns=False) | |
171 | response.flags |= dns.flags.RA | |
172 | ||
173 | if request.question[0].name == dns.name.from_text( | |
174 | 'noerror.example.com.'): | |
175 | response.set_rcode(dns.rcode.NOERROR) | |
176 | if request.question[0].rdtype in [dns.rdatatype.A, | |
177 | dns.rdatatype.ANY]: | |
178 | response.answer.append( | |
179 | dns.rrset.from_text( | |
180 | request.question[0].name, | |
181 | 0, dns.rdataclass.IN, 'A', '192.0.2.1')) | |
182 | ||
183 | if request.question[0].rdtype in [dns.rdatatype.AAAA, | |
184 | dns.rdatatype.ANY]: | |
185 | response.answer.append( | |
186 | dns.rrset.from_text(request.question[0].name, | |
187 | 0, dns.rdataclass.IN, 'AAAA', | |
188 | '2001:DB8::1')) | |
189 | if request.question[0].name == dns.name.from_text( | |
190 | 'nxd.example.com.'): | |
191 | response.set_rcode(dns.rcode.NXDOMAIN) | |
192 | response.authority.append( | |
193 | dns.rrset.from_text( | |
194 | 'example.com.', | |
195 | 0, dns.rdataclass.IN, 'SOA', 'ns1.example.com. hostmaster.example.com. 2018062101 1 2 3 4')) | |
196 | ||
197 | if request.question[0].name == dns.name.from_text( | |
198 | 'servfail.example.com.'): | |
199 | response.set_rcode(dns.rcode.SERVFAIL) | |
200 | ||
201 | self.transport.write(response.to_wire(max_size=65535), address) |