]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.ixfrdist/test_IXFR.py
stop crashing on out-of-zone data during inbound AXFR
[thirdparty/pdns.git] / regression-tests.ixfrdist / test_IXFR.py
1 import dns
2 import time
3
4 from ixfrdisttests import IXFRDistTest
5 from xfrserver.xfrserver import AXFRServer
6
7 zones = {
8 1: """
9 $ORIGIN example.
10 @ 86400 SOA foo bar 1 2 3 4 5
11 @ 4242 NS ns1.example.
12 @ 4242 NS ns2.example.
13 ns1.example. 4242 A 192.0.2.1
14 ns2.example. 4242 A 192.0.2.2
15 """,
16 2: """
17 $ORIGIN example.
18 @ 86400 SOA foo bar 2 2 3 4 5
19 @ 4242 NS ns1.example.
20 @ 4242 NS ns2.example.
21 ns1.example. 4242 A 192.0.2.1
22 ns2.example. 4242 A 192.0.2.2
23 newrecord.example. 8484 A 192.0.2.42
24 """
25 }
26
27
28 xfrServerPort = 4244
29 xfrServer = AXFRServer(xfrServerPort, zones)
30
31 class IXFRDistBasicTest(IXFRDistTest):
32 """
33 This test makes sure that we correctly fetch a zone via AXFR, and provide the full AXFR and IXFR
34 """
35
36 global xfrServerPort
37 _xfrDone = 0
38 _config_domains = { 'example': '127.0.0.1:' + str(xfrServerPort),
39 'example2': '127.0.0.1:1', # bogus port is intentional
40 'example4': '127.0.0.1:' + str(xfrServerPort) }
41
42 @classmethod
43 def setUpClass(cls):
44
45 cls.startIXFRDist()
46 cls.setUpSockets()
47
48 @classmethod
49 def tearDownClass(cls):
50 cls.tearDownIXFRDist()
51
52 def waitUntilCorrectSerialIsLoaded(self, serial, timeout=10):
53 global xfrServer
54
55 xfrServer.moveToSerial(serial)
56
57 attempts = 0
58 while attempts < timeout:
59 print('attempts=%s timeout=%s' % (attempts, timeout))
60 servedSerial = xfrServer.getServedSerial()
61 print('servedSerial=%s' % servedSerial)
62 if servedSerial > serial:
63 raise AssertionError("Expected serial %d, got %d" % (serial, servedSerial))
64 if servedSerial == serial:
65 self._xfrDone = self._xfrDone + 1
66 return
67
68 attempts = attempts + 1
69 time.sleep(1)
70
71 raise AssertionError("Waited %d seconds for the serial to be updated to %d but the last served serial is still %d" % (timeout, serial, servedSerial))
72
73 def checkFullZone(self, serial):
74 global zones
75
76 # FIXME: 90% duplication from _getRecordsForSerial
77 zone = []
78 for i in dns.zone.from_text(zones[serial], relativize=False).iterate_rdatasets():
79 n, rds = i
80 rrs=dns.rrset.RRset(n, rds.rdclass, rds.rdtype)
81 rrs.update(rds)
82 zone.append(rrs)
83
84 expected =[[zone[0]], sorted(zone[1:], key=lambda rrset: (rrset.name, rrset.rdtype)), [zone[0]]] # AXFRs are SOA-wrapped
85
86 query = dns.message.make_query('example.', 'AXFR')
87 res = self.sendTCPQueryMultiResponse(query, count=len(expected)+1) # +1 for trailing data check
88 answers = [r.answer for r in res]
89 answers[1].sort(key=lambda rrset: (rrset.name, rrset.rdtype))
90 self.assertEqual(answers, expected)
91
92 def checkIXFR(self, fromserial, toserial):
93 global zones, xfrServer
94
95 ixfr = []
96 soa1 = xfrServer._getSOAForSerial(fromserial)
97 soa2 = xfrServer._getSOAForSerial(toserial)
98 newrecord = [r for r in xfrServer._getRecordsForSerial(toserial) if r.name==dns.name.from_text('newrecord.example.')]
99 query = dns.message.make_query('example.', 'IXFR')
100 query.authority = [soa1]
101
102 expected = [[soa2], [soa1], [soa2], newrecord, [soa2]]
103 res = self.sendTCPQueryMultiResponse(query, count=len(expected)+1) # +1 for trailing data check
104 answers = [r.answer for r in res]
105
106 # answers[1].sort(key=lambda rrset: (rrset.name, rrset.rdtype))
107 self.assertEqual(answers, expected)
108
109 def test_a_XFR(self):
110 self.waitUntilCorrectSerialIsLoaded(1)
111 self.checkFullZone(1)
112
113 self.waitUntilCorrectSerialIsLoaded(2)
114 self.checkFullZone(2)
115
116 self.checkIXFR(1,2)
117
118 # _b_ because we expect post-XFR testing state
119 def test_b_UDP_SOA_existing(self):
120 query = dns.message.make_query('example.', 'SOA')
121 expected = dns.message.make_response(query)
122 expected.answer.append(xfrServer._getSOAForSerial(2))
123
124 response = self.sendUDPQuery(query)
125 self.assertEquals(expected, response)
126
127 def test_b_UDP_SOA_not_loaded(self):
128 query = dns.message.make_query('example2.', 'SOA')
129 expected = dns.message.make_response(query)
130 expected.set_rcode(dns.rcode.REFUSED)
131
132 response = self.sendUDPQuery(query)
133 self.assertEquals(expected, response)
134
135 def test_b_UDP_SOA_not_configured(self):
136 query = dns.message.make_query('example3.', 'SOA')
137 expected = dns.message.make_response(query)
138 expected.set_rcode(dns.rcode.REFUSED)
139
140 response = self.sendUDPQuery(query)
141 self.assertEquals(expected, response)