From: Michal Nowak Date: Mon, 11 May 2026 11:24:22 +0000 (+0200) Subject: Add isctest.transfer.transfer_message() helper and convert tests X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=27ee27d4e3b508f75169e61baad11adc1175cf05;p=thirdparty%2Fbind9.git Add isctest.transfer.transfer_message() helper and convert tests Add a new helper function, isctest.transfer.transfer_message(), to bin/tests/system/isctest/transfer.py that generates the log message produced by xfrin_log() in lib/dns/xfrin.c for an incoming zone transfer: transfer of '/IN' from #: The helper always returns a compiled re.Pattern. source_ns and port each accept None to match any source address / port. msg accepts either a plain str (regex-escaped automatically) or a compiled re.Pattern (spliced into the regex as-is), so callers that need regex syntax in the message part can pass Re(r"...") without having to wrap the whole result. source_ns is passed through re.escape() when provided, so dots in IPv4 addresses (e.g. "10.53.0.1") match a literal dot rather than any character. Convert the existing call sites across the system tests to use the new helper. Co-Authored-By: Nicki Křížek Assisted-by: Claude:claude-sonnet-4-6 Assisted-by: Claude:claude-opus-4-7 --- diff --git a/bin/tests/system/cipher_suites/tests_cipher_suites.py b/bin/tests/system/cipher_suites/tests_cipher_suites.py index c20401bbce0..b934eb3f125 100644 --- a/bin/tests/system/cipher_suites/tests_cipher_suites.py +++ b/bin/tests/system/cipher_suites/tests_cipher_suites.py @@ -9,8 +9,6 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from re import compile as Re - import dns.rcode import pytest @@ -27,8 +25,8 @@ pytestmark = pytest.mark.extra_artifacts( @pytest.fixture(scope="module") def transfers_complete(servers): for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]: - pattern = Re( - f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed" + pattern = isctest.transfer.transfer_message( + zone, "10.53.0.1", "Transfer completed" ) for ns in ["ns2", "ns3", "ns4", "ns5"]: with servers[ns].watch_log_from_start() as watcher: diff --git a/bin/tests/system/ede24/common.py b/bin/tests/system/ede24/common.py index d7dd983e720..13e0ae1f30b 100644 --- a/bin/tests/system/ede24/common.py +++ b/bin/tests/system/ede24/common.py @@ -27,9 +27,13 @@ def check_soa_servfail_ede24(edemsg): isctest.check.ede(res, EDECode.INVALID_DATA, edemsg) -def check_ns2_ready(ns2): +def check_ns2_ready(ns2, named_port): # Sanity check that everything works first, once we're sure the foo.fr zone # has transfered to ns2. with ns2.watch_log_from_start() as watcher: - watcher.wait_for_line("Transfer status: success") + watcher.wait_for_line( + isctest.transfer.transfer_message( + "foo.fr", "10.53.0.1", "Transfer status: success", named_port + ) + ) check_soa_noerror() diff --git a/bin/tests/system/ede24/tests_ede24_expired.py b/bin/tests/system/ede24/tests_ede24_expired.py index 02e129f4ba1..16d2d48fffd 100644 --- a/bin/tests/system/ede24/tests_ede24_expired.py +++ b/bin/tests/system/ede24/tests_ede24_expired.py @@ -13,9 +13,11 @@ import os from ede24.common import check_ns2_ready, check_soa_noerror, check_soa_servfail_ede24 +import isctest -def test_ede24_expired(ns1, ns2): - check_ns2_ready(ns2) + +def test_ede24_expired(named_port, ns1, ns2): + check_ns2_ready(ns2, named_port) # Stop the primary and wait for expiration of the zone in the secondary. with ns2.watch_log_from_here() as watcher: @@ -32,5 +34,9 @@ def test_ede24_expired(ns1, ns2): # Restart the primary and wait for the zone to be back up again. with ns2.watch_log_from_here() as watcher: ns1.start(["--noclean", "--restart", "--port", os.environ["PORT"]]) - watcher.wait_for_line("Transfer status: success") + watcher.wait_for_line( + isctest.transfer.transfer_message( + "foo.fr", "10.53.0.1", "Transfer status: success", named_port + ) + ) check_soa_noerror() diff --git a/bin/tests/system/ede24/tests_ede24_noloaded.py b/bin/tests/system/ede24/tests_ede24_noloaded.py index 90b77b8a556..0b92b63e7be 100644 --- a/bin/tests/system/ede24/tests_ede24_noloaded.py +++ b/bin/tests/system/ede24/tests_ede24_noloaded.py @@ -14,8 +14,8 @@ import os from ede24.common import check_ns2_ready, check_soa_servfail_ede24 -def test_ede24_noloaded(ns1, ns2): - check_ns2_ready(ns2) +def test_ede24_noloaded(named_port, ns1, ns2): + check_ns2_ready(ns2, named_port) # Stop all servers, and we'll restart only ns2. ns1.stop() diff --git a/bin/tests/system/isctest/__init__.py b/bin/tests/system/isctest/__init__.py index b1731f5cd5c..a817eaa4cd5 100644 --- a/bin/tests/system/isctest/__init__.py +++ b/bin/tests/system/isctest/__init__.py @@ -18,6 +18,7 @@ from . import ( # pylint: disable=redefined-builtin query, run, template, + transfer, vars, ) @@ -35,5 +36,6 @@ __all__ = [ "query", "run", "template", + "transfer", "vars", ] diff --git a/bin/tests/system/isctest/transfer.py b/bin/tests/system/isctest/transfer.py new file mode 100644 index 00000000000..95ff5bae743 --- /dev/null +++ b/bin/tests/system/isctest/transfer.py @@ -0,0 +1,47 @@ +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +from re import Pattern + +import re + + +def transfer_message( + zone: str, source_ns: str | None, msg: str | Pattern, port: int | None = None +) -> Pattern: + """Return the expected log message for an incoming zone transfer. + + Mirrors the format produced by xfrin_log() in lib/dns/xfrin.c: + + transfer of '/IN' from #: + + Always returns a compiled Pattern. When source_ns or port is None, + the unknown part is replaced by a wildcard in the regex. + + Args: + zone: Zone name (without class, e.g. "example.com"). + source_ns: Source nameserver IP address string (e.g. "10.53.0.1"), + or None to match any source address. + msg: Transfer status or other message as a plain string + (e.g. "Transfer status: success"), which is regex-escaped, + or a compiled Pattern whose .pattern is spliced in as-is + for callers that need regex syntax in the message part. + port: Source port number, or None to match any port. + """ + source_str = re.escape(source_ns) if source_ns is not None else ".*" + port_str = str(port) if port is not None else "[0-9]+" + msg_str = msg.pattern if isinstance(msg, Pattern) else re.escape(msg) + + return re.compile( + re.escape(f"transfer of '{zone}/IN' from ") + + f"{source_str}#{port_str}: " + + msg_str + ) diff --git a/bin/tests/system/mirror_root_zone/tests_mirror_root_zone.py b/bin/tests/system/mirror_root_zone/tests_mirror_root_zone.py index fd638b88bc8..51e303a5c94 100644 --- a/bin/tests/system/mirror_root_zone/tests_mirror_root_zone.py +++ b/bin/tests/system/mirror_root_zone/tests_mirror_root_zone.py @@ -13,6 +13,8 @@ from isctest.instance import NamedInstance from isctest.mark import live_internet_test +import isctest + @live_internet_test def test_mirror_root_zone(servers: dict[str, NamedInstance]): @@ -23,4 +25,6 @@ def test_mirror_root_zone(servers: dict[str, NamedInstance]): ns1 = servers["ns1"] with ns1.watch_log_from_start() as watch_log: # TimeoutError is raised if the line is not found and the test will fail. - watch_log.wait_for_line("Transfer status: success") + watch_log.wait_for_line( + isctest.transfer.transfer_message(".", None, "Transfer status: success", 53) + ) diff --git a/bin/tests/system/xfer/tests_retransfer_with_force.py b/bin/tests/system/xfer/tests_retransfer_with_force.py index 2ecae569334..0553e0fd0f4 100644 --- a/bin/tests/system/xfer/tests_retransfer_with_force.py +++ b/bin/tests/system/xfer/tests_retransfer_with_force.py @@ -28,7 +28,9 @@ def test_wait_for_zone_retransfer(named_port, ns6): with ns6.watch_log_from_here() as watcher: ns6.rndc("retransfer axfr-rndc-retransfer-force.") watcher.wait_for_line( - f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: received" + isctest.transfer.transfer_message( + "axfr-rndc-retransfer-force", "10.53.0.1", "received", named_port + ) ) @@ -40,11 +42,21 @@ def test_cancel_ongoing_retransfer(named_port, ns6): with ns6.watch_log_from_here() as watcher_transfer_shutting_down: ns6.rndc("retransfer -force axfr-rndc-retransfer-force.") watcher_transfer_shutting_down.wait_for_line( - f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: shutting down" + isctest.transfer.transfer_message( + "axfr-rndc-retransfer-force", + "10.53.0.1", + "Transfer status: shutting down", + named_port, + ) ) isctest.log.info("Wait for the new transfer to complete successfully") watcher_transfer_success.wait_for_line( - f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: success" + isctest.transfer.transfer_message( + "axfr-rndc-retransfer-force", + "10.53.0.1", + "Transfer status: success", + named_port, + ) ) diff --git a/bin/tests/system/xfer/tests_xfer.py b/bin/tests/system/xfer/tests_xfer.py index 6fcfcef78d6..077ffaa333f 100644 --- a/bin/tests/system/xfer/tests_xfer.py +++ b/bin/tests/system/xfer/tests_xfer.py @@ -40,18 +40,26 @@ def send_switch_control_command(command): @pytest.fixture(scope="module", autouse=True) -def after_servers_start(templates, ns4): +def after_servers_start(templates, named_port, ns4): # initial correctly-signed transfer should succeed send_switch_control_command("goodaxfr") with ns4.watch_log_from_here() as watcher: templates.render("ns4/named.conf", {"ns4_as_secondary_for_nil": True}) ns4.reconfigure() - watcher.wait_for_line("Transfer status: success") + watcher.wait_for_line( + isctest.transfer.transfer_message( + "nil", "10.53.0.5", "Transfer status: success", named_port + ) + ) with ns4.watch_log_from_here() as watcher_retransfer_nil_success: ns4.rndc("retransfer nil.") - watcher_retransfer_nil_success.wait_for_line("Transfer status: success") + watcher_retransfer_nil_success.wait_for_line( + isctest.transfer.transfer_message( + "nil", "10.53.0.5", "Transfer status: success", named_port + ) + ) def get_response(msg, server_ip, allow_empty_answer=False): @@ -297,7 +305,7 @@ def test_make_ns4_secondary_for_nil(): check_rdata_in_txt_record("initial AXFR") -def test_handle_ixfr_notimp(ns4): +def test_handle_ixfr_notimp(named_port, ns4): send_switch_control_command("ixfrnotimp") with ns4.watch_log_from_here() as watcher_transfer_success: with ns4.watch_log_from_here() as watcher_requesting_ixfr: @@ -305,76 +313,105 @@ def test_handle_ixfr_notimp(ns4): watcher_requesting_ixfr.wait_for_line( "zone nil/IN: requesting IXFR from 10.53.0.5" ) - watcher_transfer_success.wait_for_line("Transfer status: success") + watcher_transfer_success.wait_for_line( + isctest.transfer.transfer_message( + "nil", "10.53.0.5", "Transfer status: success", named_port + ) + ) check_rdata_in_txt_record("IXFR NOTIMP") @pytest.mark.parametrize( - "command_file,expected_rdata,named_log_line", + "command_file,expected_rdata,named_log_line,xfrin_msg", [ param( "unsigned", "unsigned AXFR", "Transfer status: expected a TSIG or SIG(0)", + True, ), param( "badkeydata", "bad keydata AXFR", "Transfer status: tsig verify failure", + True, ), param( "partial", "partially signed AXFR", "Transfer status: expected a TSIG or SIG(0)", + True, ), param( "unknownkey", "unknown key AXFR", "tsig key 'tsig_key': key name and algorithm do not match", + False, ), param( "wrongkey", "incorrect key AXFR", "tsig key 'tsig_key': key name and algorithm do not match", + False, ), param( "wrongname", "wrong question AXFR", "question name mismatch", + True, ), param( "badmessageid", "bad message id", "Transfer status: unexpected error", + True, ), param( "soamismatch", "SOA mismatch AXFR", "Transfer status: FORMERR", + True, ), ], ) -def test_under_signed_transfer(command_file, expected_rdata, named_log_line, ns4): +def test_under_signed_transfer( + command_file, expected_rdata, named_log_line, xfrin_msg, named_port, ns4 +): send_switch_control_command(command_file) with ns4.watch_log_from_here() as watcher: ns4.rndc("retransfer nil.") - watcher.wait_for_line(named_log_line) + if xfrin_msg: + watcher.wait_for_line( + isctest.transfer.transfer_message( + "nil", "10.53.0.5", named_log_line, named_port + ) + ) + else: + watcher.wait_for_line(named_log_line) check_rdata_in_txt_record(expected_rdata, should_exist=False) -def test_handle_edns_notimp(ns4): +def test_handle_edns_notimp(named_port, ns4): send_switch_control_command("ednsnotimp") with ns4.watch_log_from_here() as watcher: ns4.rndc("retransfer nil.") - watcher.wait_for_line("Transfer status: NOTIMP") + watcher.wait_for_line( + isctest.transfer.transfer_message( + "nil", "10.53.0.5", "Transfer status: NOTIMP", named_port + ) + ) -def test_handle_edns_formerr(ns4): +def test_handle_edns_formerr(named_port, ns4): send_switch_control_command("ednsformerr") with ns4.watch_log_from_here() as watcher: ns4.rndc("retransfer nil.") - watcher.wait_for_line("Transfer status: success") + watcher.wait_for_line( + isctest.transfer.transfer_message( + "nil", "10.53.0.5", "Transfer status: success", named_port + ) + ) check_rdata_in_txt_record("EDNS FORMERR") @@ -436,9 +473,16 @@ def test_mapped_zone(named_port, ns3): # test that a zone with too many records is rejected (AXFR) -def test_axfr_too_many_records(ns6): +def test_axfr_too_many_records(named_port, ns6): with ns6.watch_log_from_start() as watcher: - watcher.wait_for_line(Re("'axfr-too-big/IN'.*: too many records")) + watcher.wait_for_line( + isctest.transfer.transfer_message( + "axfr-too-big", + "10.53.0.1", + "Transfer status: too many records", + named_port, + ) + ) # test that a zone with too many records is rejected (IXFR) @@ -451,7 +495,14 @@ def test_ixfr_too_many_records(named_port, ns6): send """ nsupdate(nsupdate_config) - watcher.wait_for_line("Transfer status: too many records") + watcher.wait_for_line( + isctest.transfer.transfer_message( + "ixfr-too-big", + "10.53.0.1", + "Transfer status: too many records", + named_port, + ) + ) # test that a zone with too many diffs (IXFR) is retried with AXFR @@ -466,7 +517,12 @@ def test_ixfr_too_many_diffs(named_port, ns6): update add the-35th-record.ixfr-too-many-diffs 0 TXT ixfr send """ - log_sequence = ["too many diffs, retrying with AXFR", "Transfer status: success"] + log_sequence = [ + "too many diffs, retrying with AXFR", + isctest.transfer.transfer_message( + "ixfr-too-many-diffs", "10.53.0.1", "Transfer status: success", named_port + ), + ] with ns6.watch_log_from_here() as watcher_transfer_status: nsupdate(nsupdate_config) watcher_transfer_status.wait_for_sequence(log_sequence) @@ -477,10 +533,14 @@ def test_dig_and_named_axfr_stats(named_port, ns3): # Use ns3 logs for checking incoming transfer statistics as ns3 is a # secondary server (for ns1) for "xfer-stats". with ns3.watch_log_from_start() as watcher_transfer_completed: - pattern_transfer_completed = ( - "Transfer completed: 16 messages, 10003 records, 218403 bytes" + watcher_transfer_completed.wait_for_line( + isctest.transfer.transfer_message( + "xfer-stats", + "10.53.0.1", + "Transfer completed: 16 messages, 10003 records, 218403 bytes", + named_port, + ) ) - watcher_transfer_completed.wait_for_line(pattern_transfer_completed) # Loop until the secondary server manages to transfer the "xfer-stats" zone so # that we can both check dig output and immediately proceed with the next test. @@ -508,11 +568,18 @@ def test_dig_and_named_axfr_stats(named_port, ns3): # First, test that named tries the next primary in the list when the first one # fails (XoT -> Do53). Then, test that named tries the next primary in the list # when the first one is already marked as unreachable (XoT -> Do53). -def test_xot_primary_try_next(ns6): +def test_xot_primary_try_next(named_port, ns6): def retransfer_and_check_log(): with ns6.watch_log_from_here(timeout=60) as watcher: ns6.rndc("retransfer xot-primary-try-next.") - watcher.wait_for_line("Transfer status: success") + watcher.wait_for_line( + isctest.transfer.transfer_message( + "xot-primary-try-next", + "10.53.0.1", + "Transfer status: success", + named_port, + ) + ) retransfer_and_check_log() retransfer_and_check_log() diff --git a/bin/tests/system/xfer_servers_list/tests_xfer_servers_list.py b/bin/tests/system/xfer_servers_list/tests_xfer_servers_list.py index cb7553d171a..310ebd4d6c9 100644 --- a/bin/tests/system/xfer_servers_list/tests_xfer_servers_list.py +++ b/bin/tests/system/xfer_servers_list/tests_xfer_servers_list.py @@ -25,9 +25,13 @@ def check_soa(ns, serial): ) -def wait_for_initial_xfrin(ns): +def wait_for_initial_xfrin(ns, named_port): with ns.watch_log_from_start() as watcher: - watcher.wait_for_line("Transfer status: success") + watcher.wait_for_line( + isctest.transfer.transfer_message( + "test", "10.53.0.1", "Transfer status: success", named_port + ) + ) check_soa(ns, 1) @@ -39,11 +43,11 @@ def wait_for_sending_notify(ns1, ns, key_name): watcher.wait_for_line(pattern) -def test_xfer_servers_list(ns1, ns2, ns3, ns4, templates): - # First, wait for ns2, ns3 and ns4 to xfrin foo.fr and answer it - wait_for_initial_xfrin(ns2) - wait_for_initial_xfrin(ns3) - wait_for_initial_xfrin(ns4) +def test_xfer_servers_list(named_port, ns1, ns2, ns3, ns4, templates): + # First, wait for ns2, ns3 and ns4 to xfrin test. and answer it + wait_for_initial_xfrin(ns2, named_port) + wait_for_initial_xfrin(ns3, named_port) + wait_for_initial_xfrin(ns4, named_port) # ns1 initially notifies the secondaries using the respectively configured keys # - 10.53.0.2 has the key defined where `secondaries` is used @@ -56,13 +60,25 @@ def test_xfer_servers_list(ns1, ns2, ns3, ns4, templates): for ns, key_name in seq: wait_for_sending_notify(ns1, ns, key_name) - # Then, ns1 update foo.fr. It notifies ns2, ns3 and ns4 about it + # Then, ns1 update test. It notifies ns2, ns3 and ns4 about it templates.render("ns1/test.db", {"serial": 2}) with ns2.watch_log_from_here() as ns2_watcher, ns3.watch_log_from_here() as ns3_watcher, ns4.watch_log_from_here() as ns4_watcher: ns1.rndc("reload") - ns2_watcher.wait_for_line("Transfer status: success") - ns3_watcher.wait_for_line("Transfer status: success") - ns4_watcher.wait_for_line("Transfer status: success") + ns2_watcher.wait_for_line( + isctest.transfer.transfer_message( + "test", "10.53.0.1", "Transfer status: success", named_port + ) + ) + ns3_watcher.wait_for_line( + isctest.transfer.transfer_message( + "test", "10.53.0.1", "Transfer status: success", named_port + ) + ) + ns4_watcher.wait_for_line( + isctest.transfer.transfer_message( + "test", "10.53.0.1", "Transfer status: success", named_port + ) + ) check_soa(ns2, 2) check_soa(ns3, 2) check_soa(ns4, 2) diff --git a/bin/tests/system/xferquota/tests_xferquota.py b/bin/tests/system/xferquota/tests_xferquota.py index 6d255085cb5..2a95a6c1a8a 100644 --- a/bin/tests/system/xferquota/tests_xferquota.py +++ b/bin/tests/system/xferquota/tests_xferquota.py @@ -73,9 +73,8 @@ def test_xferquota(named_port, ns1, ns2): isctest.check.rrsets_equal(ns1response.answer, ns2response.answer) query_and_compare(axfr_msg) - pattern = Re( - f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: " - f"Transfer completed: .*\\(serial 2\\)" + pattern = isctest.transfer.transfer_message( + "changing", "10.53.0.1", Re(r"Transfer completed: .*\(serial 2\)"), named_port ) with ns2.watch_log_from_start(timeout=30) as watcher: watcher.wait_for_line(pattern)