]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.ixfrdist/test_IXFR.py
Merge pull request #7702 from rgacogne/dnsdist-static-fixes
[thirdparty/pdns.git] / regression-tests.ixfrdist / test_IXFR.py
1 import dns
2 import time
3
4 from ixfrdisttests import IXFRDistTest
5 from xfrserver.xfrserver import AXFRServer
6
7 zones = {
8 1: """
9 $ORIGIN example.
10 @ 86400 SOA foo bar 1 2 3 4 5
11 @ 4242 NS ns1.example.
12 @ 4242 NS ns2.example.
13 ns1.example. 4242 A 192.0.2.1
14 ns2.example. 4242 A 192.0.2.2
15 """,
16 2: """
17 $ORIGIN example.
18 @ 86400 SOA foo bar 2 2 3 4 5
19 @ 4242 NS ns1.example.
20 @ 4242 NS ns2.example.
21 ns1.example. 4242 A 192.0.2.1
22 ns2.example. 4242 A 192.0.2.2
23 newrecord.example. 8484 A 192.0.2.42
24 """
25 }
26
27
28 xfrServerPort = 4244
29 xfrServer = AXFRServer(xfrServerPort, zones)
30
31 class IXFRDistBasicTest(IXFRDistTest):
32 """
33 This test makes sure that we correctly fetch a zone via AXFR, and provide the full AXFR and IXFR
34 """
35
36 global xfrServerPort
37 _xfrDone = 0
38 _config_domains = { 'example': '127.0.0.1:' + str(xfrServerPort), # zone for actual XFR testing
39 'example2': '127.0.0.1:1', # bogus port is intentional - zone is intentionally unloadable
40 # example3 # intentionally absent for 'unconfigured zone' testing
41 'example4': '127.0.0.1:' + str(xfrServerPort) } # for testing how ixfrdist deals with getting the wrong zone on XFR
42
43 @classmethod
44 def setUpClass(cls):
45
46 cls.startIXFRDist()
47 cls.setUpSockets()
48
49 @classmethod
50 def tearDownClass(cls):
51 cls.tearDownIXFRDist()
52
53 def waitUntilCorrectSerialIsLoaded(self, serial, timeout=10):
54 global xfrServer
55
56 xfrServer.moveToSerial(serial)
57
58 attempts = 0
59 while attempts < timeout:
60 print('attempts=%s timeout=%s' % (attempts, timeout))
61 servedSerial = xfrServer.getServedSerial()
62 print('servedSerial=%s' % servedSerial)
63 if servedSerial > serial:
64 raise AssertionError("Expected serial %d, got %d" % (serial, servedSerial))
65 if servedSerial == serial:
66 self._xfrDone = self._xfrDone + 1
67 return
68
69 attempts = attempts + 1
70 time.sleep(1)
71
72 raise AssertionError("Waited %d seconds for the serial to be updated to %d but the last served serial is still %d" % (timeout, serial, servedSerial))
73
74 def checkFullZone(self, serial):
75 global zones
76
77 # FIXME: 90% duplication from _getRecordsForSerial
78 zone = []
79 for i in dns.zone.from_text(zones[serial], relativize=False).iterate_rdatasets():
80 n, rds = i
81 rrs=dns.rrset.RRset(n, rds.rdclass, rds.rdtype)
82 rrs.update(rds)
83 zone.append(rrs)
84
85 expected =[[zone[0]], sorted(zone[1:], key=lambda rrset: (rrset.name, rrset.rdtype)), [zone[0]]] # AXFRs are SOA-wrapped
86
87 query = dns.message.make_query('example.', 'AXFR')
88 res = self.sendTCPQueryMultiResponse(query, count=len(expected)+1) # +1 for trailing data check
89 answers = [r.answer for r in res]
90 answers[1].sort(key=lambda rrset: (rrset.name, rrset.rdtype))
91 self.assertEqual(answers, expected)
92
93 def checkIXFR(self, fromserial, toserial):
94 global zones, xfrServer
95
96 ixfr = []
97 soa1 = xfrServer._getSOAForSerial(fromserial)
98 soa2 = xfrServer._getSOAForSerial(toserial)
99 newrecord = [r for r in xfrServer._getRecordsForSerial(toserial) if r.name==dns.name.from_text('newrecord.example.')]
100 query = dns.message.make_query('example.', 'IXFR')
101 query.authority = [soa1]
102
103 expected = [[soa2], [soa1], [soa2], newrecord, [soa2]]
104 res = self.sendTCPQueryMultiResponse(query, count=len(expected)+1) # +1 for trailing data check
105 answers = [r.answer for r in res]
106
107 # answers[1].sort(key=lambda rrset: (rrset.name, rrset.rdtype))
108 self.assertEqual(answers, expected)
109 # check the TTLs
110 answerPos = 0
111 for expectedAnswer in expected:
112 pos = 0
113 for rec in expectedAnswer:
114 self.assertEquals(rec.ttl, answers[answerPos][pos].ttl)
115 pos = pos + 1
116 answerPos = answerPos + 1
117
118 def test_a_XFR(self):
119 self.waitUntilCorrectSerialIsLoaded(1)
120 self.checkFullZone(1)
121
122 self.waitUntilCorrectSerialIsLoaded(2)
123 self.checkFullZone(2)
124
125 self.checkIXFR(1,2)
126
127 # _b_ because we expect post-XFR testing state
128 def test_b_UDP_SOA_existing(self):
129 query = dns.message.make_query('example.', 'SOA')
130 expected = dns.message.make_response(query)
131 expected.answer.append(xfrServer._getSOAForSerial(2))
132
133 response = self.sendUDPQuery(query)
134 self.assertEquals(expected, response)
135 # check the TTLs
136 pos = 0
137 for rec in expected.answer:
138 self.assertEquals(rec.ttl, response.answer[pos].ttl)
139 pos = pos + 1
140
141 def test_b_UDP_SOA_not_loaded(self):
142 query = dns.message.make_query('example2.', 'SOA')
143 expected = dns.message.make_response(query)
144 expected.set_rcode(dns.rcode.REFUSED)
145
146 response = self.sendUDPQuery(query)
147 self.assertEquals(expected, response)
148
149 def test_b_UDP_SOA_not_configured(self):
150 query = dns.message.make_query('example3.', 'SOA')
151 expected = dns.message.make_response(query)
152 expected.set_rcode(dns.rcode.REFUSED)
153
154 response = self.sendUDPQuery(query)
155 self.assertEquals(expected, response)