]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_ALIAS.py
3 from __future__
import print_function
9 from twisted
.internet
.protocol
import DatagramProtocol
10 from twisted
.internet
import reactor
12 from authtests
import AuthTest
14 aliasUDPReactorRunning
= False
17 class TestALIAS(AuthTest
):
18 _config_template
= """
25 _config_params
= ['_PREFIX']
29 example.org. 3600 IN SOA {soa}
30 example.org. 3600 IN NS ns1.example.org.
31 example.org. 3600 IN NS ns2.example.org.
32 ns1.example.org. 3600 IN A {prefix}.10
33 ns2.example.org. 3600 IN A {prefix}.11
35 noerror.example.org. 3600 IN ALIAS noerror.example.com.
36 nxd.example.org. 3600 IN ALIAS nxd.example.com.
37 servfail.example.org. 3600 IN ALIAS servfail.example.com
42 def startResponders(cls
):
43 global aliasUDPReactorRunning
45 address
= cls
._PREFIX
+ '.1'
48 if not aliasUDPReactorRunning
:
49 reactor
.listenUDP(port
, AliasUDPResponder(), interface
=address
)
51 aliasUDPReactorRunning
= True
53 if not reactor
.running
:
54 cls
._ALIASResponder
= threading
.Thread(name
='ALIAS Responder',
57 cls
._ALIASResponder
.setDaemon(True)
58 cls
._ALIASResponder
.start()
60 def testNoError(self
):
61 expected_a
= [dns
.rrset
.from_text('noerror.example.org.',
62 0, dns
.rdataclass
.IN
, 'A',
64 expected_aaaa
= [dns
.rrset
.from_text('noerror.example.org.',
65 0, dns
.rdataclass
.IN
, 'AAAA',
68 query
= dns
.message
.make_query('noerror.example.org', 'A')
69 res
= self
.sendUDPQuery(query
)
70 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
71 self
.assertAnyRRsetInAnswer(res
, expected_a
)
73 query
= dns
.message
.make_query('noerror.example.org', 'AAAA')
74 res
= self
.sendUDPQuery(query
)
75 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
76 self
.assertAnyRRsetInAnswer(res
, expected_aaaa
)
78 query
= dns
.message
.make_query('noerror.example.org', 'ANY')
79 res
= self
.sendUDPQuery(query
)
80 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
81 self
.assertAnyRRsetInAnswer(res
, expected_a
)
82 self
.assertAnyRRsetInAnswer(res
, expected_aaaa
)
85 query
= dns
.message
.make_query('noerror.example.org', 'MX')
86 res
= self
.sendUDPQuery(query
)
87 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
88 self
.assertEqual(len(res
.answer
), 0)
90 def testNxDomain(self
):
91 query
= dns
.message
.make_query('nxd.example.org', 'A')
92 res
= self
.sendUDPQuery(query
)
93 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
95 query
= dns
.message
.make_query('nxd.example.org', 'AAAA')
96 res
= self
.sendUDPQuery(query
)
97 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
99 # TODO this should actually return SOA + NS?
100 query
= dns
.message
.make_query('nxd.example.org', 'ANY')
101 res
= self
.sendUDPQuery(query
)
102 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
104 def testServFail(self
):
105 query
= dns
.message
.make_query('servfail.example.org', 'A')
106 res
= self
.sendUDPQuery(query
)
107 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
109 query
= dns
.message
.make_query('servfail.example.org', 'AAAA')
110 res
= self
.sendUDPQuery(query
)
111 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
113 # TODO this should actually return SOA + NS?
114 query
= dns
.message
.make_query('servfail.example.org', 'ANY')
115 res
= self
.sendUDPQuery(query
)
116 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
118 def testNoErrorTCP(self
):
119 expected_a
= [dns
.rrset
.from_text('noerror.example.org.',
120 0, dns
.rdataclass
.IN
, 'A',
122 expected_aaaa
= [dns
.rrset
.from_text('noerror.example.org.',
123 0, dns
.rdataclass
.IN
, 'AAAA',
126 query
= dns
.message
.make_query('noerror.example.org', 'A')
127 res
= self
.sendTCPQuery(query
)
128 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
129 self
.assertAnyRRsetInAnswer(res
, expected_a
)
131 query
= dns
.message
.make_query('noerror.example.org', 'AAAA')
132 res
= self
.sendTCPQuery(query
)
133 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
134 self
.assertAnyRRsetInAnswer(res
, expected_aaaa
)
136 query
= dns
.message
.make_query('noerror.example.org', 'ANY')
137 res
= self
.sendTCPQuery(query
)
138 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
139 self
.assertAnyRRsetInAnswer(res
, expected_a
)
140 self
.assertAnyRRsetInAnswer(res
, expected_aaaa
)
143 query
= dns
.message
.make_query('noerror.example.org', 'MX')
144 res
= self
.sendTCPQuery(query
)
145 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
146 self
.assertEqual(len(res
.answer
), 0)
148 def testNxDomainTCP(self
):
149 query
= dns
.message
.make_query('nxd.example.org', 'A')
150 res
= self
.sendTCPQuery(query
)
151 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
153 query
= dns
.message
.make_query('nxd.example.org', 'AAAA')
154 res
= self
.sendTCPQuery(query
)
155 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
157 query
= dns
.message
.make_query('nxd.example.org', 'ANY')
158 res
= self
.sendTCPQuery(query
)
159 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
161 def testServFailTCP(self
):
162 query
= dns
.message
.make_query('servfail.example.org', 'A')
163 res
= self
.sendTCPQuery(query
)
164 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
166 query
= dns
.message
.make_query('servfail.example.org', 'AAAA')
167 res
= self
.sendTCPQuery(query
)
168 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
170 query
= dns
.message
.make_query('servfail.example.org', 'ANY')
171 res
= self
.sendTCPQuery(query
)
172 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
175 class AliasUDPResponder(DatagramProtocol
):
176 def datagramReceived(self
, datagram
, address
):
177 request
= dns
.message
.from_wire(datagram
)
178 response
= dns
.message
.make_response(request
)
179 response
.use_edns(edns
=False)
180 response
.flags |
= dns
.flags
.RA
182 if request
.question
[0].name
== dns
.name
.from_text(
183 'noerror.example.com.'):
184 response
.set_rcode(dns
.rcode
.NOERROR
)
185 if request
.question
[0].rdtype
in [dns
.rdatatype
.A
,
187 response
.answer
.append(
189 request
.question
[0].name
,
190 0, dns
.rdataclass
.IN
, 'A', '192.0.2.1'))
192 if request
.question
[0].rdtype
in [dns
.rdatatype
.AAAA
,
194 response
.answer
.append(
195 dns
.rrset
.from_text(request
.question
[0].name
,
196 0, dns
.rdataclass
.IN
, 'AAAA',
198 if request
.question
[0].name
== dns
.name
.from_text(
200 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
201 response
.authority
.append(
204 0, dns
.rdataclass
.IN
, 'SOA', 'ns1.example.com. hostmaster.example.com. 2018062101 1 2 3 4'))
206 if request
.question
[0].name
== dns
.name
.from_text(
207 'servfail.example.com.'):
208 response
.set_rcode(dns
.rcode
.SERVFAIL
)
210 self
.transport
.write(response
.to_wire(max_size
=65535), address
)