From: Jeremy Muriel Date: Mon, 1 Sep 2025 14:50:06 +0000 (+0200) Subject: fix case sensitivity handling for IXFR requests in ixfrdist X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=refs%2Fpull%2F16063%2Fhead;p=thirdparty%2Fpdns.git fix case sensitivity handling for IXFR requests in ixfrdist TXT record values are case sensitive, but the compilation of an IXFR difference in ixfrdist doesn't produce a difference when the change is only a case modification. This commit removes the toLower() call when comparing record content to properly handle case-sensitive changes. --- diff --git a/pdns/ixfrutils.hh b/pdns/ixfrutils.hh index da22415f3e..ec546b3468 100644 --- a/pdns/ixfrutils.hh +++ b/pdns/ixfrutils.hh @@ -37,7 +37,7 @@ struct CIContentCompareStruct { bool operator()(const shared_ptr&a, const shared_ptr& b) const { - return toLower(a->getZoneRepresentation()) < toLower(b->getZoneRepresentation()); + return a->getZoneRepresentation() < b->getZoneRepresentation(); } }; diff --git a/regression-tests.ixfrdist/test_CaseSensitiveTXT.py b/regression-tests.ixfrdist/test_CaseSensitiveTXT.py new file mode 100644 index 0000000000..344282210b --- /dev/null +++ b/regression-tests.ixfrdist/test_CaseSensitiveTXT.py @@ -0,0 +1,216 @@ +import dns +import dns.serial +import time +import itertools +import socket + +from ixfrdisttests import IXFRDistTest +from xfrserver.xfrserver import AXFRServer + +zones = { + 1: """ +$ORIGIN ixfr.case. +@ 86400 SOA foo bar 1 2 3 4 5 +@ 4242 NS ns1.ixfr.case. +@ 4242 NS ns2.ixfr.case. +ns1.ixfr.case. 4242 A 192.0.2.1 +ns2.ixfr.case. 4242 A 192.0.2.2 +test.ixfr.case. 1234 TXT "Hello World" +""", + 2: """ +$ORIGIN ixfr.case. +@ 86400 SOA foo bar 2 2 3 4 5 +@ 4242 NS ns1.ixfr.case. +@ 4242 NS ns2.ixfr.case. +ns1.ixfr.case. 4242 A 192.0.2.1 +ns2.ixfr.case. 4242 A 192.0.2.2 +test.ixfr.case. 1234 TXT "hello world" +""", + 3: """ +$ORIGIN ixfr.case. +@ 86400 SOA foo bar 3 2 3 4 5 +@ 4242 NS ns1.ixfr.case. +@ 4242 NS ns2.ixfr.case. +ns1.ixfr.case. 4242 A 192.0.2.1 +ns2.ixfr.case. 4242 A 192.0.2.2 +test.ixfr.case. 1234 TXT "HELLO WORLD" +""", + 4: """ +$ORIGIN ixfr.case. +@ 86400 SOA foo bar 4 2 3 4 5 +@ 4242 NS ns1.ixfr.case. +@ 4242 NS ns2.ixfr.case. +ns1.ixfr.case. 4242 A 192.0.2.1 +ns2.ixfr.case. 4242 A 192.0.2.2 +test.ixfr.case. 1234 TXT "Hello World" +case2.ixfr.case. 1234 TXT "Mixed Case" +case3.ixfr.case. 1234 TXT "mixed case" +""", + 5: """ +$ORIGIN ixfr.case. +@ 86400 SOA foo bar 5 2 3 4 5 +@ 4242 NS ns1.ixfr.case. +@ 4242 NS ns2.ixfr.case. +ns1.ixfr.case. 4242 A 192.0.2.1 +ns2.ixfr.case. 4242 A 192.0.2.2 +test.ixfr.case. 1234 TXT "Hello World" +case2.ixfr.case. 1234 TXT "Mixed Case" +case3.ixfr.case. 1234 TXT "MIXED CASE" +""" +} + +xfrServerPort = 4246 +xfrServer = AXFRServer(xfrServerPort, zones) + +class IXFRDistCaseSensitiveTXTTest(IXFRDistTest): + """ + This test verifies that TXT record comparisons are case-sensitive + after the fix in pdns/ixfrutils.hh where toLower() was removed + from CIContentCompareStruct::operator() + """ + + global xfrServerPort + _xfrDone = 0 + _config_domains = [ + {"domain" : "ixfr.case", "master" : "127.0.0.1:" + str(xfrServerPort)}, + ] + _loaded_serials = [] + + @classmethod + def setUpClass(cls): + cls.startIXFRDist() + cls.setUpSockets() + + @classmethod + def tearDownClass(cls): + cls.tearDownIXFRDist() + + def waitUntilCorrectSerialIsLoaded(self, serial, timeout=10, notify=False): + global xfrServer + + xfrServer.moveToSerial(serial) + + if notify: + notif = dns.message.make_query('ixfr.case.', 'SOA') + notif.set_opcode(dns.opcode.NOTIFY) + notify_response = self.sendUDPQuery(notif) + assert notify_response.rcode() == dns.rcode.NOERROR + + def get_current_serial(): + query = dns.message.make_query('ixfr.case.', 'SOA') + response_message = self.sendUDPQuery(query) + + if response_message.rcode() == dns.rcode.REFUSED: + return 0 + + soa_rrset = response_message.find_rrset(dns.message.ANSWER, dns.name.from_text("ixfr.case."), dns.rdataclass.IN, dns.rdatatype.SOA) + return soa_rrset[0].serial + + attempts = 0 + while attempts < timeout: + print('attempts=%s timeout=%s' % (attempts, timeout)) + servedSerial = get_current_serial() + print('servedSerial=%s' % servedSerial) + if servedSerial > serial: + raise AssertionError("Expected serial %d, got %d" % (serial, servedSerial)) + if servedSerial == serial: + self._xfrDone = self._xfrDone + 1 + self._loaded_serials.append(serial) + return + + attempts = attempts + 1 + time.sleep(1) + + raise AssertionError("Waited %d seconds for the serial to be updated to %d but the last served serial is still %d" % (timeout, serial, servedSerial)) + + def checkTXTRecord(self, expected_txt_records): + """ + Check for the presence of specific TXT records in the zone using AXFR + """ + query = dns.message.make_query('ixfr.case.', 'AXFR') + responses = self.sendTCPQueryMultiResponse(query, count=10) + + found_txt_records = [] + for response in responses: + for rrset in response.answer: + if rrset.rdtype == dns.rdatatype.TXT: + for rr in rrset: + txt_content = str(rr).strip('"') + record_name = str(rrset.name) + found_txt_records.append((record_name, txt_content)) + + for expected_name, expected_txt in expected_txt_records: + found = False + for found_name, found_txt in found_txt_records: + if found_name == expected_name and found_txt == expected_txt: + found = True + break + self.assertTrue(found, + f"TXT record '{expected_name}' with content '{expected_txt}' not found in AXFR. Found: {found_txt_records}") + + def checkIXFRContainsTXTChange(self, fromserial, toserial, expected_removed=None, expected_added=None): + """ + Verify that IXFR properly handles case-sensitive TXT record changes + """ + global xfrServer + + soa_requested = xfrServer._getSOAForSerial(fromserial) + soa_latest = xfrServer._getSOAForSerial(self._loaded_serials[-1]) + + self.assertEqual(soa_latest[0].serial, toserial) + + query = dns.message.make_query('ixfr.case.', 'IXFR') + query.authority = [soa_requested] + + responses = self.sendTCPQueryMultiResponse(query, count=10) # Allow for multiple responses + + # Check if the expected TXT records are in the IXFR response + found_removed = [] + found_added = [] + + for response in responses: + for rrset in response.answer: + if rrset.rdtype == dns.rdatatype.TXT: + for rr in rrset: + txt_content = str(rr).strip('"') + # This is a simplified check - in real IXFR, we'd need to + # analyze the removal/addition sections properly + if expected_removed and txt_content in expected_removed: + found_removed.append(txt_content) + if expected_added and txt_content in expected_added: + found_added.append(txt_content) + + if expected_removed: + for removed_txt in expected_removed: + self.assertIn(removed_txt, found_removed, + f"Expected removed TXT '{removed_txt}' not found in IXFR") + + if expected_added: + for added_txt in expected_added: + self.assertIn(added_txt, found_added, + f"Expected added TXT '{added_txt}' not found in IXFR") + + def test_a_first_version(self): + """Test first version of zone and verify TXT record presence""" + self.waitUntilCorrectSerialIsLoaded(1) + self.checkTXTRecord([('test.ixfr.case.', "Hello World")]) + + def test_b_case_change_lowercase(self): + """Test that changing TXT from 'Hello World' to 'hello world' is detected""" + self.waitUntilCorrectSerialIsLoaded(2) + self.checkIXFRContainsTXTChange(1, 2, expected_removed=["Hello World"], expected_added=["hello world"]) + + def test_c_case_change_uppercase(self): + """Test that changing TXT from 'hello world' to 'HELLO WORLD' is detected""" + self.waitUntilCorrectSerialIsLoaded(3) + self.checkIXFRContainsTXTChange(2, 3, expected_removed=["hello world"], expected_added=["HELLO WORLD"]) + + def test_d_multiple_txt_records_mixed_case(self): + """Test multiple TXT records with different case variations""" + self.waitUntilCorrectSerialIsLoaded(4) + self.checkIXFRContainsTXTChange(3, 4, expected_removed=["HELLO WORLD"], expected_added=["Hello World", "Mixed Case", "mixed case"]) + + def test_e_case_sensitive_update(self): + """Test that updating a TXT record's case is properly handled""" + self.waitUntilCorrectSerialIsLoaded(5) + self.checkIXFRContainsTXTChange(4, 5, expected_removed=["mixed case"], expected_added=["MIXED CASE"])