]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_ALIAS.py
Make sure we can install unsigned packages.
[thirdparty/pdns.git] / regression-tests.auth-py / test_ALIAS.py
1 #!/usr/bin/env python
2
3 from __future__ import print_function
4
5 import threading
6 import unittest
7
8 import dns
9 from twisted.internet.protocol import DatagramProtocol
10 from twisted.internet import reactor
11
12 from authtests import AuthTest
13
14 aliasUDPReactorRunning = False
15
16
17 class TestALIAS(AuthTest):
18 _config_template = """
19 expand-alias=yes
20 resolver=%s.1:5301
21 any-to-tcp=no
22 launch=bind
23 """
24
25 _config_params = ['_PREFIX']
26
27 _zones = {
28 'example.org': """
29 example.org. 3600 IN SOA {soa}
30 example.org. 3600 IN NS ns1.example.org.
31 example.org. 3600 IN NS ns2.example.org.
32 ns1.example.org. 3600 IN A {prefix}.10
33 ns2.example.org. 3600 IN A {prefix}.11
34
35 noerror.example.org. 3600 IN ALIAS noerror.example.com.
36 nxd.example.org. 3600 IN ALIAS nxd.example.com.
37 servfail.example.org. 3600 IN ALIAS servfail.example.com
38 """,
39 }
40
41 @classmethod
42 def startResponders(cls):
43 global aliasUDPReactorRunning
44
45 address = cls._PREFIX + '.1'
46 port = 5301
47
48 if not aliasUDPReactorRunning:
49 reactor.listenUDP(port, AliasUDPResponder(), interface=address)
50
51 aliasUDPReactorRunning = True
52
53 if not reactor.running:
54 cls._ALIASResponder = threading.Thread(name='ALIAS Responder',
55 target=reactor.run,
56 args=(False,))
57 cls._ALIASResponder.setDaemon(True)
58 cls._ALIASResponder.start()
59
60 def testNoError(self):
61 expected_a = [dns.rrset.from_text('noerror.example.org.',
62 0, dns.rdataclass.IN, 'A',
63 '192.0.2.1')]
64 expected_aaaa = [dns.rrset.from_text('noerror.example.org.',
65 0, dns.rdataclass.IN, 'AAAA',
66 '2001:DB8::1')]
67
68 query = dns.message.make_query('noerror.example.org', 'A')
69 res = self.sendUDPQuery(query)
70 self.assertRcodeEqual(res, dns.rcode.NOERROR)
71 self.assertAnyRRsetInAnswer(res, expected_a)
72
73 query = dns.message.make_query('noerror.example.org', 'AAAA')
74 res = self.sendUDPQuery(query)
75 self.assertRcodeEqual(res, dns.rcode.NOERROR)
76 self.assertAnyRRsetInAnswer(res, expected_aaaa)
77
78 query = dns.message.make_query('noerror.example.org', 'ANY')
79 res = self.sendUDPQuery(query)
80 self.assertRcodeEqual(res, dns.rcode.NOERROR)
81 self.assertAnyRRsetInAnswer(res, expected_a)
82 self.assertAnyRRsetInAnswer(res, expected_aaaa)
83
84 # NODATA
85 query = dns.message.make_query('noerror.example.org', 'MX')
86 res = self.sendUDPQuery(query)
87 self.assertRcodeEqual(res, dns.rcode.NOERROR)
88 self.assertEqual(len(res.answer), 0)
89
90 def testNxDomain(self):
91 query = dns.message.make_query('nxd.example.org', 'A')
92 res = self.sendUDPQuery(query)
93 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
94
95 query = dns.message.make_query('nxd.example.org', 'AAAA')
96 res = self.sendUDPQuery(query)
97 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
98
99 # TODO this should actually return SOA + NS?
100 query = dns.message.make_query('nxd.example.org', 'ANY')
101 res = self.sendUDPQuery(query)
102 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
103
104 def testServFail(self):
105 query = dns.message.make_query('servfail.example.org', 'A')
106 res = self.sendUDPQuery(query)
107 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
108
109 query = dns.message.make_query('servfail.example.org', 'AAAA')
110 res = self.sendUDPQuery(query)
111 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
112
113 # TODO this should actually return SOA + NS?
114 query = dns.message.make_query('servfail.example.org', 'ANY')
115 res = self.sendUDPQuery(query)
116 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
117
118 def testNoErrorTCP(self):
119 expected_a = [dns.rrset.from_text('noerror.example.org.',
120 0, dns.rdataclass.IN, 'A',
121 '192.0.2.1')]
122 expected_aaaa = [dns.rrset.from_text('noerror.example.org.',
123 0, dns.rdataclass.IN, 'AAAA',
124 '2001:DB8::1')]
125
126 query = dns.message.make_query('noerror.example.org', 'A')
127 res = self.sendTCPQuery(query)
128 self.assertRcodeEqual(res, dns.rcode.NOERROR)
129 self.assertAnyRRsetInAnswer(res, expected_a)
130
131 query = dns.message.make_query('noerror.example.org', 'AAAA')
132 res = self.sendTCPQuery(query)
133 self.assertRcodeEqual(res, dns.rcode.NOERROR)
134 self.assertAnyRRsetInAnswer(res, expected_aaaa)
135
136 query = dns.message.make_query('noerror.example.org', 'ANY')
137 res = self.sendTCPQuery(query)
138 self.assertRcodeEqual(res, dns.rcode.NOERROR)
139 self.assertAnyRRsetInAnswer(res, expected_a)
140 self.assertAnyRRsetInAnswer(res, expected_aaaa)
141
142 # NODATA
143 query = dns.message.make_query('noerror.example.org', 'MX')
144 res = self.sendTCPQuery(query)
145 self.assertRcodeEqual(res, dns.rcode.NOERROR)
146 self.assertEqual(len(res.answer), 0)
147
148 def testNxDomainTCP(self):
149 query = dns.message.make_query('nxd.example.org', 'A')
150 res = self.sendTCPQuery(query)
151 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
152
153 query = dns.message.make_query('nxd.example.org', 'AAAA')
154 res = self.sendTCPQuery(query)
155 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
156
157 query = dns.message.make_query('nxd.example.org', 'ANY')
158 res = self.sendTCPQuery(query)
159 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
160
161 def testServFailTCP(self):
162 query = dns.message.make_query('servfail.example.org', 'A')
163 res = self.sendTCPQuery(query)
164 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
165
166 query = dns.message.make_query('servfail.example.org', 'AAAA')
167 res = self.sendTCPQuery(query)
168 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
169
170 query = dns.message.make_query('servfail.example.org', 'ANY')
171 res = self.sendTCPQuery(query)
172 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
173
174
175 class AliasUDPResponder(DatagramProtocol):
176 def datagramReceived(self, datagram, address):
177 request = dns.message.from_wire(datagram)
178 response = dns.message.make_response(request)
179 response.use_edns(edns=False)
180 response.flags |= dns.flags.RA
181
182 if request.question[0].name == dns.name.from_text(
183 'noerror.example.com.'):
184 response.set_rcode(dns.rcode.NOERROR)
185 if request.question[0].rdtype in [dns.rdatatype.A,
186 dns.rdatatype.ANY]:
187 response.answer.append(
188 dns.rrset.from_text(
189 request.question[0].name,
190 0, dns.rdataclass.IN, 'A', '192.0.2.1'))
191
192 if request.question[0].rdtype in [dns.rdatatype.AAAA,
193 dns.rdatatype.ANY]:
194 response.answer.append(
195 dns.rrset.from_text(request.question[0].name,
196 0, dns.rdataclass.IN, 'AAAA',
197 '2001:DB8::1'))
198 if request.question[0].name == dns.name.from_text(
199 'nxd.example.com.'):
200 response.set_rcode(dns.rcode.NXDOMAIN)
201 response.authority.append(
202 dns.rrset.from_text(
203 'example.com.',
204 0, dns.rdataclass.IN, 'SOA', 'ns1.example.com. hostmaster.example.com. 2018062101 1 2 3 4'))
205
206 if request.question[0].name == dns.name.from_text(
207 'servfail.example.com.'):
208 response.set_rcode(dns.rcode.SERVFAIL)
209
210 self.transport.write(response.to_wire(max_size=65535), address)