]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add isctest.transfer.transfer_message() helper and convert tests 11995/head
authorMichal Nowak <mnowak@isc.org>
Mon, 11 May 2026 11:24:22 +0000 (13:24 +0200)
committerMichal Nowak <mnowak@isc.org>
Mon, 11 May 2026 14:30:44 +0000 (16:30 +0200)
Add a new helper function, isctest.transfer.transfer_message(), to
bin/tests/system/isctest/transfer.py that generates the log message
produced by xfrin_log() in lib/dns/xfrin.c for an incoming zone
transfer:

    transfer of '<zone>/IN' from <source_ns>#<port>: <msg>

The helper always returns a compiled re.Pattern.  source_ns and port
each accept None to match any source address / port.  msg accepts
either a plain str (regex-escaped automatically) or a compiled
re.Pattern (spliced into the regex as-is), so callers that need regex
syntax in the message part can pass Re(r"...") without having to
wrap the whole result.

source_ns is passed through re.escape() when provided, so dots in
IPv4 addresses (e.g. "10.53.0.1") match a literal dot rather than
any character.

Convert the existing call sites across the system tests to use the
new helper.

Co-Authored-By: Nicki Křížek <nicki@isc.org>
Assisted-by: Claude:claude-sonnet-4-6
Assisted-by: Claude:claude-opus-4-7
(cherry picked from commit 27ee27d4e3b508f75169e61baad11adc1175cf05)

bin/tests/system/cipher_suites/tests_cipher_suites.py
bin/tests/system/dialup/tests_dialup_zone_transfer.py
bin/tests/system/isctest/__init__.py
bin/tests/system/isctest/transfer.py [new file with mode: 0644]
bin/tests/system/mirror_root_zone/tests_mirror_root_zone.py
bin/tests/system/xfer/tests_retransfer_with_force.py
bin/tests/system/xfer/tests_xfer.py
bin/tests/system/xfer_servers_list/tests_xfer_servers_list.py
bin/tests/system/xferquota/tests_xferquota.py

index c20401bbce04cf598c2c199cfb81aa78cbe87b81..b934eb3f1252c7b729cf720da9bc3d70f7e6caac 100644 (file)
@@ -9,8 +9,6 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-from re import compile as Re
-
 import dns.rcode
 import pytest
 
@@ -27,8 +25,8 @@ pytestmark = pytest.mark.extra_artifacts(
 @pytest.fixture(scope="module")
 def transfers_complete(servers):
     for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]:
-        pattern = Re(
-            f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed"
+        pattern = isctest.transfer.transfer_message(
+            zone, "10.53.0.1", "Transfer completed"
         )
         for ns in ["ns2", "ns3", "ns4", "ns5"]:
             with servers[ns].watch_log_from_start() as watcher:
index 25d55f9f933de7164f29eb36de8e917aa7c6d8ef..7a061bbcaf0b948f7488a696ea22e337a857c8a7 100644 (file)
@@ -32,7 +32,9 @@ def test_dialup_zone_transfer(named_port, servers, ns):
     ns1response = isctest.query.tcp(msg, "10.53.0.1")
     with servers[f"ns{ns}"].watch_log_from_start(timeout=90) as watcher:
         watcher.wait_for_line(
-            f"transfer of 'example/IN' from 10.53.0.{ns-1}#{named_port}: Transfer status: success",
+            isctest.transfer.transfer_message(
+                "example", f"10.53.0.{ns-1}", "Transfer status: success", named_port
+            )
         )
     response = isctest.query.tcp(msg, f"10.53.0.{ns}")
     if response.rcode() != dns.rcode.SERVFAIL:
index b1731f5cd5c6f1e9399d9608d92f6244c09d1207..a817eaa4cd502565ff3080c9d0fdaaa2f77f42b4 100644 (file)
@@ -18,6 +18,7 @@ from . import (  # pylint: disable=redefined-builtin
     query,
     run,
     template,
+    transfer,
     vars,
 )
 
@@ -35,5 +36,6 @@ __all__ = [
     "query",
     "run",
     "template",
+    "transfer",
     "vars",
 ]
diff --git a/bin/tests/system/isctest/transfer.py b/bin/tests/system/isctest/transfer.py
new file mode 100644 (file)
index 0000000..95ff5ba
--- /dev/null
@@ -0,0 +1,47 @@
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0.  If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+from re import Pattern
+
+import re
+
+
+def transfer_message(
+    zone: str, source_ns: str | None, msg: str | Pattern, port: int | None = None
+) -> Pattern:
+    """Return the expected log message for an incoming zone transfer.
+
+    Mirrors the format produced by xfrin_log() in lib/dns/xfrin.c:
+
+        transfer of '<zone>/IN' from <source_ns>#<port>: <msg>
+
+    Always returns a compiled Pattern.  When source_ns or port is None,
+    the unknown part is replaced by a wildcard in the regex.
+
+    Args:
+        zone:      Zone name (without class, e.g. "example.com").
+        source_ns: Source nameserver IP address string (e.g. "10.53.0.1"),
+                   or None to match any source address.
+        msg:       Transfer status or other message as a plain string
+                   (e.g. "Transfer status: success"), which is regex-escaped,
+                   or a compiled Pattern whose .pattern is spliced in as-is
+                   for callers that need regex syntax in the message part.
+        port:      Source port number, or None to match any port.
+    """
+    source_str = re.escape(source_ns) if source_ns is not None else ".*"
+    port_str = str(port) if port is not None else "[0-9]+"
+    msg_str = msg.pattern if isinstance(msg, Pattern) else re.escape(msg)
+
+    return re.compile(
+        re.escape(f"transfer of '{zone}/IN' from ")
+        + f"{source_str}#{port_str}: "
+        + msg_str
+    )
index 4a5c52d9c1d766464444d85bc7ef95c35e1e20fd..59767fae4f5c408e74f01f86dd3fb6ea4637d3cb 100644 (file)
@@ -12,6 +12,8 @@
 from isctest.instance import NamedInstance
 from isctest.mark import live_internet_test
 
+import isctest
+
 
 @live_internet_test
 def test_mirror_root_zone(ns1: NamedInstance):
@@ -21,4 +23,6 @@ def test_mirror_root_zone(ns1: NamedInstance):
     """
     with ns1.watch_log_from_start() as watch_log:
         # TimeoutError is raised if the line is not found and the test will fail.
-        watch_log.wait_for_line("Transfer status: success")
+        watch_log.wait_for_line(
+            isctest.transfer.transfer_message(".", None, "Transfer status: success", 53)
+        )
index 2ecae569334fc51ef4cef5d036b5b1ab34631277..0553e0fd0f4ba54b63716ab57c0ff919c229dbd9 100644 (file)
@@ -28,7 +28,9 @@ def test_wait_for_zone_retransfer(named_port, ns6):
     with ns6.watch_log_from_here() as watcher:
         ns6.rndc("retransfer axfr-rndc-retransfer-force.")
         watcher.wait_for_line(
-            f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: received"
+            isctest.transfer.transfer_message(
+                "axfr-rndc-retransfer-force", "10.53.0.1", "received", named_port
+            )
         )
 
 
@@ -40,11 +42,21 @@ def test_cancel_ongoing_retransfer(named_port, ns6):
         with ns6.watch_log_from_here() as watcher_transfer_shutting_down:
             ns6.rndc("retransfer -force axfr-rndc-retransfer-force.")
             watcher_transfer_shutting_down.wait_for_line(
-                f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: shutting down"
+                isctest.transfer.transfer_message(
+                    "axfr-rndc-retransfer-force",
+                    "10.53.0.1",
+                    "Transfer status: shutting down",
+                    named_port,
+                )
             )
         isctest.log.info("Wait for the new transfer to complete successfully")
         watcher_transfer_success.wait_for_line(
-            f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: success"
+            isctest.transfer.transfer_message(
+                "axfr-rndc-retransfer-force",
+                "10.53.0.1",
+                "Transfer status: success",
+                named_port,
+            )
         )
 
 
index b87a75fe7bf54330508d3067e94456defdd925ab..2adb5ea2b972213eb3c8c67592a3ba809ac30dd0 100644 (file)
@@ -40,18 +40,26 @@ def send_switch_control_command(command):
 
 
 @pytest.fixture(scope="module", autouse=True)
-def after_servers_start(templates, ns4):
+def after_servers_start(templates, named_port, ns4):
     # initial correctly-signed transfer should succeed
     send_switch_control_command("goodaxfr")
 
     with ns4.watch_log_from_here() as watcher:
         templates.render("ns4/named.conf", {"ns4_as_secondary_for_nil": True})
         ns4.reconfigure()
-        watcher.wait_for_line("Transfer status: success")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: success", named_port
+            )
+        )
 
     with ns4.watch_log_from_here() as watcher_retransfer_nil_success:
         ns4.rndc("retransfer nil.")
-        watcher_retransfer_nil_success.wait_for_line("Transfer status: success")
+        watcher_retransfer_nil_success.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: success", named_port
+            )
+        )
 
 
 def get_response(msg, server_ip, allow_empty_answer=False):
@@ -297,7 +305,7 @@ def test_make_ns4_secondary_for_nil():
     check_rdata_in_txt_record("initial AXFR")
 
 
-def test_handle_ixfr_notimp(ns4):
+def test_handle_ixfr_notimp(named_port, ns4):
     send_switch_control_command("ixfrnotimp")
     with ns4.watch_log_from_here() as watcher_transfer_success:
         with ns4.watch_log_from_here() as watcher_requesting_ixfr:
@@ -305,76 +313,105 @@ def test_handle_ixfr_notimp(ns4):
             watcher_requesting_ixfr.wait_for_line(
                 "zone nil/IN: requesting IXFR from 10.53.0.5"
             )
-        watcher_transfer_success.wait_for_line("Transfer status: success")
+        watcher_transfer_success.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: success", named_port
+            )
+        )
 
     check_rdata_in_txt_record("IXFR NOTIMP")
 
 
 @pytest.mark.parametrize(
-    "command_file,expected_rdata,named_log_line",
+    "command_file,expected_rdata,named_log_line,xfrin_msg",
     [
         param(
             "unsigned",
             "unsigned AXFR",
             "Transfer status: expected a TSIG or SIG(0)",
+            True,
         ),
         param(
             "badkeydata",
             "bad keydata AXFR",
             "Transfer status: tsig verify failure",
+            True,
         ),
         param(
             "partial",
             "partially signed AXFR",
             "Transfer status: expected a TSIG or SIG(0)",
+            True,
         ),
         param(
             "unknownkey",
             "unknown key AXFR",
             "tsig key 'tsig_key': key name and algorithm do not match",
+            False,
         ),
         param(
             "wrongkey",
             "incorrect key AXFR",
             "tsig key 'tsig_key': key name and algorithm do not match",
+            False,
         ),
         param(
             "wrongname",
             "wrong question AXFR",
             "question name mismatch",
+            True,
         ),
         param(
             "badmessageid",
             "bad message id",
             "Transfer status: unexpected error",
+            True,
         ),
         param(
             "soamismatch",
             "SOA mismatch AXFR",
             "Transfer status: FORMERR",
+            True,
         ),
     ],
 )
-def test_under_signed_transfer(command_file, expected_rdata, named_log_line, ns4):
+def test_under_signed_transfer(
+    command_file, expected_rdata, named_log_line, xfrin_msg, named_port, ns4
+):
     send_switch_control_command(command_file)
     with ns4.watch_log_from_here() as watcher:
         ns4.rndc("retransfer nil.")
-        watcher.wait_for_line(named_log_line)
+        if xfrin_msg:
+            watcher.wait_for_line(
+                isctest.transfer.transfer_message(
+                    "nil", "10.53.0.5", named_log_line, named_port
+                )
+            )
+        else:
+            watcher.wait_for_line(named_log_line)
     check_rdata_in_txt_record(expected_rdata, should_exist=False)
 
 
-def test_handle_edns_notimp(ns4):
+def test_handle_edns_notimp(named_port, ns4):
     send_switch_control_command("ednsnotimp")
     with ns4.watch_log_from_here() as watcher:
         ns4.rndc("retransfer nil.")
-        watcher.wait_for_line("Transfer status: NOTIMP")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: NOTIMP", named_port
+            )
+        )
 
 
-def test_handle_edns_formerr(ns4):
+def test_handle_edns_formerr(named_port, ns4):
     send_switch_control_command("ednsformerr")
     with ns4.watch_log_from_here() as watcher:
         ns4.rndc("retransfer nil.")
-        watcher.wait_for_line("Transfer status: success")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: success", named_port
+            )
+        )
     check_rdata_in_txt_record("EDNS FORMERR")
 
 
@@ -436,9 +473,16 @@ def test_mapped_zone(named_port, ns3):
 
 
 # test that a zone with too many records is rejected (AXFR)
-def test_axfr_too_many_records(ns6):
+def test_axfr_too_many_records(named_port, ns6):
     with ns6.watch_log_from_start() as watcher:
-        watcher.wait_for_line(Re("'axfr-too-big/IN'.*: too many records"))
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "axfr-too-big",
+                "10.53.0.1",
+                "Transfer status: too many records",
+                named_port,
+            )
+        )
 
 
 # test that a zone with too many records is rejected (IXFR)
@@ -451,7 +495,14 @@ def test_ixfr_too_many_records(named_port, ns6):
         send
         """
         nsupdate(nsupdate_config)
-        watcher.wait_for_line("Transfer status: too many records")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "ixfr-too-big",
+                "10.53.0.1",
+                "Transfer status: too many records",
+                named_port,
+            )
+        )
 
 
 # checking whether dig calculates AXFR statistics correctly
@@ -459,10 +510,14 @@ def test_dig_and_named_axfr_stats(named_port, ns3):
     # Use ns3 logs for checking incoming transfer statistics as ns3 is a
     # secondary server (for ns1) for "xfer-stats".
     with ns3.watch_log_from_start() as watcher_transfer_completed:
-        pattern_transfer_completed = (
-            "Transfer completed: 16 messages, 10003 records, 218403 bytes"
+        watcher_transfer_completed.wait_for_line(
+            isctest.transfer.transfer_message(
+                "xfer-stats",
+                "10.53.0.1",
+                "Transfer completed: 16 messages, 10003 records, 218403 bytes",
+                named_port,
+            )
         )
-        watcher_transfer_completed.wait_for_line(pattern_transfer_completed)
 
     # Loop until the secondary server manages to transfer the "xfer-stats" zone so
     # that we can both check dig output and immediately proceed with the next test.
@@ -497,11 +552,18 @@ def test_transfer_source_option_uses_port_option_correctly(ns6):
 # First, test that named tries the next primary in the list when the first one
 # fails (XoT -> Do53). Then, test that named tries the next primary in the list
 # when the first one is already marked as unreachable (XoT -> Do53).
-def test_xot_primary_try_next(ns6):
+def test_xot_primary_try_next(named_port, ns6):
     def retransfer_and_check_log():
         with ns6.watch_log_from_here(timeout=60) as watcher:
             ns6.rndc("retransfer xot-primary-try-next.")
-            watcher.wait_for_line("Transfer status: success")
+            watcher.wait_for_line(
+                isctest.transfer.transfer_message(
+                    "xot-primary-try-next",
+                    "10.53.0.1",
+                    "Transfer status: success",
+                    named_port,
+                )
+            )
 
     retransfer_and_check_log()
     retransfer_and_check_log()
index 54cf2fcaf23c4d7430a165d0dc9a53103cb76b98..1b7d962a79a623138e79b46569619be03761566a 100644 (file)
@@ -25,9 +25,13 @@ def check_soa(ns, serial):
     )
 
 
-def wait_for_initial_xfrin(ns):
+def wait_for_initial_xfrin(ns, named_port):
     with ns.watch_log_from_start() as watcher:
-        watcher.wait_for_line("Transfer status: success")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "test", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
     check_soa(ns, 1)
 
 
@@ -39,11 +43,11 @@ def wait_for_sending_notify(ns1, ns, key_name):
         watcher.wait_for_line(pattern)
 
 
-def test_xfer_servers_list(ns1, ns2, ns3, ns4, templates):
-    # First, wait for ns2, ns3 and ns4 to xfrin foo.fr and answer it
-    wait_for_initial_xfrin(ns2)
-    wait_for_initial_xfrin(ns3)
-    wait_for_initial_xfrin(ns4)
+def test_xfer_servers_list(named_port, ns1, ns2, ns3, ns4, templates):
+    # First, wait for ns2, ns3 and ns4 to xfrin test. and answer it
+    wait_for_initial_xfrin(ns2, named_port)
+    wait_for_initial_xfrin(ns3, named_port)
+    wait_for_initial_xfrin(ns4, named_port)
 
     # ns1 initially notifies the secondaries using the respectively configured keys
     # - 10.53.0.2 has the key defined where `secondaries` is used
@@ -56,13 +60,25 @@ def test_xfer_servers_list(ns1, ns2, ns3, ns4, templates):
     for ns, key_name in seq:
         wait_for_sending_notify(ns1, ns, key_name)
 
-    # Then, ns1 update foo.fr. It notifies ns2, ns3 and ns4 about it
+    # Then, ns1 update test. It notifies ns2, ns3 and ns4 about it
     templates.render("ns1/test.db", {"serial": 2})
     with ns2.watch_log_from_here() as ns2_watcher, ns3.watch_log_from_here() as ns3_watcher, ns4.watch_log_from_here() as ns4_watcher:
         ns1.rndc("reload")
-        ns2_watcher.wait_for_line("Transfer status: success")
-        ns3_watcher.wait_for_line("Transfer status: success")
-        ns4_watcher.wait_for_line("Transfer status: success")
+        ns2_watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "test", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
+        ns3_watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "test", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
+        ns4_watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "test", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
     check_soa(ns2, 2)
     check_soa(ns3, 2)
     check_soa(ns4, 2)
index 6d255085cb558638bab65579a746d4e837f3d2bb..2a95a6c1a8a444a596a4a71b651956436f4029e0 100644 (file)
@@ -73,9 +73,8 @@ def test_xferquota(named_port, ns1, ns2):
         isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
 
     query_and_compare(axfr_msg)
-    pattern = Re(
-        f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
-        f"Transfer completed: .*\\(serial 2\\)"
+    pattern = isctest.transfer.transfer_message(
+        "changing", "10.53.0.1", Re(r"Transfer completed: .*\(serial 2\)"), named_port
     )
     with ns2.watch_log_from_start(timeout=30) as watcher:
         watcher.wait_for_line(pattern)