]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.auth-py/test_ALIAS.py
Merge pull request #7026 from jsoref/configure-enable-with
[thirdparty/pdns.git] / regression-tests.auth-py / test_ALIAS.py
CommitLineData
838adb89
PL
1#!/usr/bin/env python
2
3from __future__ import print_function
4
5import threading
6import unittest
7
8import dns
9from twisted.internet.protocol import DatagramProtocol
10from twisted.internet import reactor
11
12from authtests import AuthTest
13
14aliasUDPReactorRunning = False
15
16
17class TestALIAS(AuthTest):
18 _zones = {
19 'example.org': """
20example.org. 3600 IN SOA {soa}
21example.org. 3600 IN NS ns1.example.org.
22example.org. 3600 IN NS ns2.example.org.
23ns1.example.org. 3600 IN A {prefix}.10
24ns2.example.org. 3600 IN A {prefix}.11
25
26noerror.example.org. 3600 IN ALIAS noerror.example.com.
27nxd.example.org. 3600 IN ALIAS nxd.example.com.
28servfail.example.org. 3600 IN ALIAS servfail.example.com
29 """,
30 }
31
32 @classmethod
33 def startResponders(cls):
34 global aliasUDPReactorRunning
35
36 address = cls._PREFIX + '.1'
37 port = 5301
38
39 if not aliasUDPReactorRunning:
40 reactor.listenUDP(port, AliasUDPResponder(), interface=address)
41
42 aliasUDPReactorRunning = True
43
44 if not reactor.running:
45 cls._ALIASResponder = threading.Thread(name='ALIAS Responder',
46 target=reactor.run,
47 args=(False,))
48 cls._ALIASResponder.setDaemon(True)
49 cls._ALIASResponder.start()
50
51 def testNoError(self):
52 expected_a = [dns.rrset.from_text('noerror.example.org.',
53 0, dns.rdataclass.IN, 'A',
54 '192.0.2.1')]
55 expected_aaaa = [dns.rrset.from_text('noerror.example.org.',
56 0, dns.rdataclass.IN, 'AAAA',
57 '2001:DB8::1')]
58
59 query = dns.message.make_query('noerror.example.org', 'A')
60 res = self.sendUDPQuery(query)
61 self.assertRcodeEqual(res, dns.rcode.NOERROR)
62 self.assertAnyRRsetInAnswer(res, expected_a)
63
64 query = dns.message.make_query('noerror.example.org', 'AAAA')
65 res = self.sendUDPQuery(query)
66 self.assertRcodeEqual(res, dns.rcode.NOERROR)
67 self.assertAnyRRsetInAnswer(res, expected_aaaa)
68
69 query = dns.message.make_query('noerror.example.org', 'ANY')
70 res = self.sendUDPQuery(query)
71 self.assertRcodeEqual(res, dns.rcode.NOERROR)
72 self.assertAnyRRsetInAnswer(res, expected_a)
73 self.assertAnyRRsetInAnswer(res, expected_aaaa)
74
75 # NODATA
76 query = dns.message.make_query('noerror.example.org', 'MX')
77 res = self.sendUDPQuery(query)
78 self.assertRcodeEqual(res, dns.rcode.NOERROR)
79 self.assertEqual(len(res.answer), 0)
80
81 def testNxDomain(self):
82 query = dns.message.make_query('nxd.example.org', 'A')
83 res = self.sendUDPQuery(query)
84 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
85
86 query = dns.message.make_query('nxd.example.org', 'AAAA')
87 res = self.sendUDPQuery(query)
88 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
89
90 # TODO this should actually return SOA + NS?
91 query = dns.message.make_query('nxd.example.org', 'ANY')
92 res = self.sendUDPQuery(query)
93 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
94
95 def testServFail(self):
96 query = dns.message.make_query('servfail.example.org', 'A')
97 res = self.sendUDPQuery(query)
98 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
99
100 query = dns.message.make_query('servfail.example.org', 'AAAA')
101 res = self.sendUDPQuery(query)
102 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
103
104 # TODO this should actually return SOA + NS?
105 query = dns.message.make_query('servfail.example.org', 'ANY')
106 res = self.sendUDPQuery(query)
107 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
108
109 def testNoErrorTCP(self):
110 expected_a = [dns.rrset.from_text('noerror.example.org.',
111 0, dns.rdataclass.IN, 'A',
112 '192.0.2.1')]
113 expected_aaaa = [dns.rrset.from_text('noerror.example.org.',
114 0, dns.rdataclass.IN, 'AAAA',
115 '2001:DB8::1')]
116
117 query = dns.message.make_query('noerror.example.org', 'A')
118 res = self.sendTCPQuery(query)
119 self.assertRcodeEqual(res, dns.rcode.NOERROR)
120 self.assertAnyRRsetInAnswer(res, expected_a)
121
122 query = dns.message.make_query('noerror.example.org', 'AAAA')
123 res = self.sendTCPQuery(query)
124 self.assertRcodeEqual(res, dns.rcode.NOERROR)
125 self.assertAnyRRsetInAnswer(res, expected_aaaa)
126
127 query = dns.message.make_query('noerror.example.org', 'ANY')
128 res = self.sendTCPQuery(query)
129 self.assertRcodeEqual(res, dns.rcode.NOERROR)
130 self.assertAnyRRsetInAnswer(res, expected_a)
131 self.assertAnyRRsetInAnswer(res, expected_aaaa)
132
133 # NODATA
134 query = dns.message.make_query('noerror.example.org', 'MX')
135 res = self.sendTCPQuery(query)
136 self.assertRcodeEqual(res, dns.rcode.NOERROR)
137 self.assertEqual(len(res.answer), 0)
138
139 def testNxDomainTCP(self):
140 query = dns.message.make_query('nxd.example.org', 'A')
141 res = self.sendTCPQuery(query)
142 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
143
144 query = dns.message.make_query('nxd.example.org', 'AAAA')
145 res = self.sendTCPQuery(query)
146 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
147
148 query = dns.message.make_query('nxd.example.org', 'ANY')
149 res = self.sendTCPQuery(query)
150 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
151
152 def testServFailTCP(self):
153 query = dns.message.make_query('servfail.example.org', 'A')
154 res = self.sendTCPQuery(query)
155 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
156
157 query = dns.message.make_query('servfail.example.org', 'AAAA')
158 res = self.sendTCPQuery(query)
159 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
160
161 query = dns.message.make_query('servfail.example.org', 'ANY')
162 res = self.sendTCPQuery(query)
163 self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
164
165
166class AliasUDPResponder(DatagramProtocol):
167 def datagramReceived(self, datagram, address):
168 request = dns.message.from_wire(datagram)
169 response = dns.message.make_response(request)
170 response.use_edns(edns=False)
171 response.flags |= dns.flags.RA
172
173 if request.question[0].name == dns.name.from_text(
174 'noerror.example.com.'):
175 response.set_rcode(dns.rcode.NOERROR)
176 if request.question[0].rdtype in [dns.rdatatype.A,
177 dns.rdatatype.ANY]:
178 response.answer.append(
179 dns.rrset.from_text(
180 request.question[0].name,
181 0, dns.rdataclass.IN, 'A', '192.0.2.1'))
182
183 if request.question[0].rdtype in [dns.rdatatype.AAAA,
184 dns.rdatatype.ANY]:
185 response.answer.append(
186 dns.rrset.from_text(request.question[0].name,
187 0, dns.rdataclass.IN, 'AAAA',
188 '2001:DB8::1'))
189 if request.question[0].name == dns.name.from_text(
190 'nxd.example.com.'):
191 response.set_rcode(dns.rcode.NXDOMAIN)
192 response.authority.append(
193 dns.rrset.from_text(
194 'example.com.',
195 0, dns.rdataclass.IN, 'SOA', 'ns1.example.com. hostmaster.example.com. 2018062101 1 2 3 4'))
196
197 if request.question[0].name == dns.name.from_text(
198 'servfail.example.com.'):
199 response.set_rcode(dns.rcode.SERVFAIL)
200
201 self.transport.write(response.to_wire(max_size=65535), address)