]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.ixfrdist/test_IXFR.py
4 from ixfrdisttests
import IXFRDistTest
5 from xfrserver
.xfrserver
import AXFRServer
10 @ 86400 SOA foo bar 1 2 3 4 5
11 @ 4242 NS ns1.example.
12 @ 4242 NS ns2.example.
13 ns1.example. 4242 A 192.0.2.1
14 ns2.example. 4242 A 192.0.2.2
18 @ 86400 SOA foo bar 2 2 3 4 5
19 @ 4242 NS ns1.example.
20 @ 4242 NS ns2.example.
21 ns1.example. 4242 A 192.0.2.1
22 ns2.example. 4242 A 192.0.2.2
23 newrecord.example. 8484 A 192.0.2.42
29 xfrServer
= AXFRServer(xfrServerPort
, zones
)
31 class IXFRDistBasicTest(IXFRDistTest
):
33 This test makes sure that we correctly fetch a zone via AXFR, and provide the full AXFR and IXFR
38 _config_domains
= { 'example': '127.0.0.1:' + str(xfrServerPort
), # zone for actual XFR testing
39 'example2': '127.0.0.1:1', # bogus port is intentional - zone is intentionally unloadable
40 # example3 # intentionally absent for 'unconfigured zone' testing
41 'example4': '127.0.0.1:' + str(xfrServerPort
) } # for testing how ixfrdist deals with getting the wrong zone on XFR
50 def tearDownClass(cls
):
51 cls
.tearDownIXFRDist()
53 def waitUntilCorrectSerialIsLoaded(self
, serial
, timeout
=10):
56 xfrServer
.moveToSerial(serial
)
59 while attempts
< timeout
:
60 print('attempts=%s timeout=%s' % (attempts
, timeout
))
61 servedSerial
= xfrServer
.getServedSerial()
62 print('servedSerial=%s' % servedSerial
)
63 if servedSerial
> serial
:
64 raise AssertionError("Expected serial %d, got %d" % (serial
, servedSerial
))
65 if servedSerial
== serial
:
66 self
._xfrDone
= self
._xfrDone
+ 1
69 attempts
= attempts
+ 1
72 raise AssertionError("Waited %d seconds for the serial to be updated to %d but the last served serial is still %d" % (timeout
, serial
, servedSerial
))
74 def checkFullZone(self
, serial
):
77 # FIXME: 90% duplication from _getRecordsForSerial
79 for i
in dns
.zone
.from_text(zones
[serial
], relativize
=False).iterate_rdatasets():
81 rrs
=dns
.rrset
.RRset(n
, rds
.rdclass
, rds
.rdtype
)
85 expected
=[[zone
[0]], sorted(zone
[1:], key
=lambda rrset
: (rrset
.name
, rrset
.rdtype
)), [zone
[0]]] # AXFRs are SOA-wrapped
87 query
= dns
.message
.make_query('example.', 'AXFR')
88 res
= self
.sendTCPQueryMultiResponse(query
, count
=len(expected
)+1) # +1 for trailing data check
89 answers
= [r
.answer
for r
in res
]
90 answers
[1].sort(key
=lambda rrset
: (rrset
.name
, rrset
.rdtype
))
91 self
.assertEqual(answers
, expected
)
93 def checkIXFR(self
, fromserial
, toserial
):
94 global zones
, xfrServer
97 soa1
= xfrServer
._getSOAForSerial
(fromserial
)
98 soa2
= xfrServer
._getSOAForSerial
(toserial
)
99 newrecord
= [r
for r
in xfrServer
._getRecordsForSerial
(toserial
) if r
.name
==dns
.name
.from_text('newrecord.example.')]
100 query
= dns
.message
.make_query('example.', 'IXFR')
101 query
.authority
= [soa1
]
103 expected
= [[soa2
], [soa1
], [soa2
], newrecord
, [soa2
]]
104 res
= self
.sendTCPQueryMultiResponse(query
, count
=len(expected
)+1) # +1 for trailing data check
105 answers
= [r
.answer
for r
in res
]
107 # answers[1].sort(key=lambda rrset: (rrset.name, rrset.rdtype))
108 self
.assertEqual(answers
, expected
)
111 for expectedAnswer
in expected
:
113 for rec
in expectedAnswer
:
114 self
.assertEquals(rec
.ttl
, answers
[answerPos
][pos
].ttl
)
116 answerPos
= answerPos
+ 1
118 def test_a_XFR(self
):
119 self
.waitUntilCorrectSerialIsLoaded(1)
120 self
.checkFullZone(1)
122 self
.waitUntilCorrectSerialIsLoaded(2)
123 self
.checkFullZone(2)
127 # _b_ because we expect post-XFR testing state
128 def test_b_UDP_SOA_existing(self
):
129 query
= dns
.message
.make_query('example.', 'SOA')
130 expected
= dns
.message
.make_response(query
)
131 expected
.answer
.append(xfrServer
._getSOAForSerial
(2))
133 response
= self
.sendUDPQuery(query
)
134 self
.assertEquals(expected
, response
)
137 for rec
in expected
.answer
:
138 self
.assertEquals(rec
.ttl
, response
.answer
[pos
].ttl
)
141 def test_b_UDP_SOA_not_loaded(self
):
142 query
= dns
.message
.make_query('example2.', 'SOA')
143 expected
= dns
.message
.make_response(query
)
144 expected
.set_rcode(dns
.rcode
.REFUSED
)
146 response
= self
.sendUDPQuery(query
)
147 self
.assertEquals(expected
, response
)
149 def test_b_UDP_SOA_not_configured(self
):
150 query
= dns
.message
.make_query('example3.', 'SOA')
151 expected
= dns
.message
.make_response(query
)
152 expected
.set_rcode(dns
.rcode
.REFUSED
)
154 response
= self
.sendUDPQuery(query
)
155 self
.assertEquals(expected
, response
)