]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.auth-py/test_ALIAS.py
3 from __future__
import print_function
9 from twisted
.internet
.protocol
import DatagramProtocol
10 from twisted
.internet
import reactor
12 from authtests
import AuthTest
14 aliasUDPReactorRunning
= False
17 class TestALIAS(AuthTest
):
20 example.org. 3600 IN SOA {soa}
21 example.org. 3600 IN NS ns1.example.org.
22 example.org. 3600 IN NS ns2.example.org.
23 ns1.example.org. 3600 IN A {prefix}.10
24 ns2.example.org. 3600 IN A {prefix}.11
26 noerror.example.org. 3600 IN ALIAS noerror.example.com.
27 nxd.example.org. 3600 IN ALIAS nxd.example.com.
28 servfail.example.org. 3600 IN ALIAS servfail.example.com
33 def startResponders(cls
):
34 global aliasUDPReactorRunning
36 address
= cls
._PREFIX
+ '.1'
39 if not aliasUDPReactorRunning
:
40 reactor
.listenUDP(port
, AliasUDPResponder(), interface
=address
)
42 aliasUDPReactorRunning
= True
44 if not reactor
.running
:
45 cls
._ALIASResponder
= threading
.Thread(name
='ALIAS Responder',
48 cls
._ALIASResponder
.setDaemon(True)
49 cls
._ALIASResponder
.start()
51 def testNoError(self
):
52 expected_a
= [dns
.rrset
.from_text('noerror.example.org.',
53 0, dns
.rdataclass
.IN
, 'A',
55 expected_aaaa
= [dns
.rrset
.from_text('noerror.example.org.',
56 0, dns
.rdataclass
.IN
, 'AAAA',
59 query
= dns
.message
.make_query('noerror.example.org', 'A')
60 res
= self
.sendUDPQuery(query
)
61 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
62 self
.assertAnyRRsetInAnswer(res
, expected_a
)
64 query
= dns
.message
.make_query('noerror.example.org', 'AAAA')
65 res
= self
.sendUDPQuery(query
)
66 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
67 self
.assertAnyRRsetInAnswer(res
, expected_aaaa
)
69 query
= dns
.message
.make_query('noerror.example.org', 'ANY')
70 res
= self
.sendUDPQuery(query
)
71 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
72 self
.assertAnyRRsetInAnswer(res
, expected_a
)
73 self
.assertAnyRRsetInAnswer(res
, expected_aaaa
)
76 query
= dns
.message
.make_query('noerror.example.org', 'MX')
77 res
= self
.sendUDPQuery(query
)
78 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
79 self
.assertEqual(len(res
.answer
), 0)
81 def testNxDomain(self
):
82 query
= dns
.message
.make_query('nxd.example.org', 'A')
83 res
= self
.sendUDPQuery(query
)
84 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
86 query
= dns
.message
.make_query('nxd.example.org', 'AAAA')
87 res
= self
.sendUDPQuery(query
)
88 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
90 # TODO this should actually return SOA + NS?
91 query
= dns
.message
.make_query('nxd.example.org', 'ANY')
92 res
= self
.sendUDPQuery(query
)
93 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
95 def testServFail(self
):
96 query
= dns
.message
.make_query('servfail.example.org', 'A')
97 res
= self
.sendUDPQuery(query
)
98 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
100 query
= dns
.message
.make_query('servfail.example.org', 'AAAA')
101 res
= self
.sendUDPQuery(query
)
102 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
104 # TODO this should actually return SOA + NS?
105 query
= dns
.message
.make_query('servfail.example.org', 'ANY')
106 res
= self
.sendUDPQuery(query
)
107 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
109 def testNoErrorTCP(self
):
110 expected_a
= [dns
.rrset
.from_text('noerror.example.org.',
111 0, dns
.rdataclass
.IN
, 'A',
113 expected_aaaa
= [dns
.rrset
.from_text('noerror.example.org.',
114 0, dns
.rdataclass
.IN
, 'AAAA',
117 query
= dns
.message
.make_query('noerror.example.org', 'A')
118 res
= self
.sendTCPQuery(query
)
119 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
120 self
.assertAnyRRsetInAnswer(res
, expected_a
)
122 query
= dns
.message
.make_query('noerror.example.org', 'AAAA')
123 res
= self
.sendTCPQuery(query
)
124 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
125 self
.assertAnyRRsetInAnswer(res
, expected_aaaa
)
127 query
= dns
.message
.make_query('noerror.example.org', 'ANY')
128 res
= self
.sendTCPQuery(query
)
129 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
130 self
.assertAnyRRsetInAnswer(res
, expected_a
)
131 self
.assertAnyRRsetInAnswer(res
, expected_aaaa
)
134 query
= dns
.message
.make_query('noerror.example.org', 'MX')
135 res
= self
.sendTCPQuery(query
)
136 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
137 self
.assertEqual(len(res
.answer
), 0)
139 def testNxDomainTCP(self
):
140 query
= dns
.message
.make_query('nxd.example.org', 'A')
141 res
= self
.sendTCPQuery(query
)
142 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
144 query
= dns
.message
.make_query('nxd.example.org', 'AAAA')
145 res
= self
.sendTCPQuery(query
)
146 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
148 query
= dns
.message
.make_query('nxd.example.org', 'ANY')
149 res
= self
.sendTCPQuery(query
)
150 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
152 def testServFailTCP(self
):
153 query
= dns
.message
.make_query('servfail.example.org', 'A')
154 res
= self
.sendTCPQuery(query
)
155 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
157 query
= dns
.message
.make_query('servfail.example.org', 'AAAA')
158 res
= self
.sendTCPQuery(query
)
159 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
161 query
= dns
.message
.make_query('servfail.example.org', 'ANY')
162 res
= self
.sendTCPQuery(query
)
163 self
.assertRcodeEqual(res
, dns
.rcode
.SERVFAIL
)
166 class AliasUDPResponder(DatagramProtocol
):
167 def datagramReceived(self
, datagram
, address
):
168 request
= dns
.message
.from_wire(datagram
)
169 response
= dns
.message
.make_response(request
)
170 response
.use_edns(edns
=False)
171 response
.flags |
= dns
.flags
.RA
173 if request
.question
[0].name
== dns
.name
.from_text(
174 'noerror.example.com.'):
175 response
.set_rcode(dns
.rcode
.NOERROR
)
176 if request
.question
[0].rdtype
in [dns
.rdatatype
.A
,
178 response
.answer
.append(
180 request
.question
[0].name
,
181 0, dns
.rdataclass
.IN
, 'A', '192.0.2.1'))
183 if request
.question
[0].rdtype
in [dns
.rdatatype
.AAAA
,
185 response
.answer
.append(
186 dns
.rrset
.from_text(request
.question
[0].name
,
187 0, dns
.rdataclass
.IN
, 'AAAA',
189 if request
.question
[0].name
== dns
.name
.from_text(
191 response
.set_rcode(dns
.rcode
.NXDOMAIN
)
192 response
.authority
.append(
195 0, dns
.rdataclass
.IN
, 'SOA', 'ns1.example.com. hostmaster.example.com. 2018062101 1 2 3 4'))
197 if request
.question
[0].name
== dns
.name
.from_text(
198 'servfail.example.com.'):
199 response
.set_rcode(dns
.rcode
.SERVFAIL
)
201 self
.transport
.write(response
.to_wire(max_size
=65535), address
)