]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.ixfrdist/test_IXFR.py
4 from ixfrdisttests
import IXFRDistTest
5 from xfrserver
.xfrserver
import AXFRServer
10 @ 86400 SOA foo bar 1 2 3 4 5
11 @ 4242 NS ns1.example.
12 @ 4242 NS ns2.example.
13 ns1.example. 4242 A 192.0.2.1
14 ns2.example. 4242 A 192.0.2.2
18 @ 86400 SOA foo bar 2 2 3 4 5
19 @ 4242 NS ns1.example.
20 @ 4242 NS ns2.example.
21 ns1.example. 4242 A 192.0.2.1
22 ns2.example. 4242 A 192.0.2.2
23 newrecord.example. 8484 A 192.0.2.42
29 xfrServer
= AXFRServer(xfrServerPort
, zones
)
31 class IXFRDistBasicTest(IXFRDistTest
):
33 This test makes sure that we correctly fetch a zone via AXFR, and provide the full AXFR and IXFR
38 _config_domains
= { 'example': '127.0.0.1:' + str(xfrServerPort
),
39 'example2': '127.0.0.1:1', # bogus port is intentional
40 'example4': '127.0.0.1:' + str(xfrServerPort
) }
49 def tearDownClass(cls
):
50 cls
.tearDownIXFRDist()
52 def waitUntilCorrectSerialIsLoaded(self
, serial
, timeout
=10):
55 xfrServer
.moveToSerial(serial
)
58 while attempts
< timeout
:
59 print('attempts=%s timeout=%s' % (attempts
, timeout
))
60 servedSerial
= xfrServer
.getServedSerial()
61 print('servedSerial=%s' % servedSerial
)
62 if servedSerial
> serial
:
63 raise AssertionError("Expected serial %d, got %d" % (serial
, servedSerial
))
64 if servedSerial
== serial
:
65 self
._xfrDone
= self
._xfrDone
+ 1
68 attempts
= attempts
+ 1
71 raise AssertionError("Waited %d seconds for the serial to be updated to %d but the last served serial is still %d" % (timeout
, serial
, servedSerial
))
73 def checkFullZone(self
, serial
):
76 # FIXME: 90% duplication from _getRecordsForSerial
78 for i
in dns
.zone
.from_text(zones
[serial
], relativize
=False).iterate_rdatasets():
80 rrs
=dns
.rrset
.RRset(n
, rds
.rdclass
, rds
.rdtype
)
84 expected
=[[zone
[0]], sorted(zone
[1:], key
=lambda rrset
: (rrset
.name
, rrset
.rdtype
)), [zone
[0]]] # AXFRs are SOA-wrapped
86 query
= dns
.message
.make_query('example.', 'AXFR')
87 res
= self
.sendTCPQueryMultiResponse(query
, count
=len(expected
)+1) # +1 for trailing data check
88 answers
= [r
.answer
for r
in res
]
89 answers
[1].sort(key
=lambda rrset
: (rrset
.name
, rrset
.rdtype
))
90 self
.assertEqual(answers
, expected
)
92 def checkIXFR(self
, fromserial
, toserial
):
93 global zones
, xfrServer
96 soa1
= xfrServer
._getSOAForSerial
(fromserial
)
97 soa2
= xfrServer
._getSOAForSerial
(toserial
)
98 newrecord
= [r
for r
in xfrServer
._getRecordsForSerial
(toserial
) if r
.name
==dns
.name
.from_text('newrecord.example.')]
99 query
= dns
.message
.make_query('example.', 'IXFR')
100 query
.authority
= [soa1
]
102 expected
= [[soa2
], [soa1
], [soa2
], newrecord
, [soa2
]]
103 res
= self
.sendTCPQueryMultiResponse(query
, count
=len(expected
)+1) # +1 for trailing data check
104 answers
= [r
.answer
for r
in res
]
106 # answers[1].sort(key=lambda rrset: (rrset.name, rrset.rdtype))
107 self
.assertEqual(answers
, expected
)
109 def test_a_XFR(self
):
110 self
.waitUntilCorrectSerialIsLoaded(1)
111 self
.checkFullZone(1)
113 self
.waitUntilCorrectSerialIsLoaded(2)
114 self
.checkFullZone(2)
118 # _b_ because we expect post-XFR testing state
119 def test_b_UDP_SOA_existing(self
):
120 query
= dns
.message
.make_query('example.', 'SOA')
121 expected
= dns
.message
.make_response(query
)
122 expected
.answer
.append(xfrServer
._getSOAForSerial
(2))
124 response
= self
.sendUDPQuery(query
)
125 self
.assertEquals(expected
, response
)
127 def test_b_UDP_SOA_not_loaded(self
):
128 query
= dns
.message
.make_query('example2.', 'SOA')
129 expected
= dns
.message
.make_response(query
)
130 expected
.set_rcode(dns
.rcode
.REFUSED
)
132 response
= self
.sendUDPQuery(query
)
133 self
.assertEquals(expected
, response
)
135 def test_b_UDP_SOA_not_configured(self
):
136 query
= dns
.message
.make_query('example3.', 'SOA')
137 expected
= dns
.message
.make_response(query
)
138 expected
.set_rcode(dns
.rcode
.REFUSED
)
140 response
= self
.sendUDPQuery(query
)
141 self
.assertEquals(expected
, response
)