]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Add isctest.transfer.transfer_message() helper and convert tests 11796/head
authorMichal Nowak <mnowak@isc.org>
Mon, 11 May 2026 11:24:22 +0000 (13:24 +0200)
committerMichal Nowak <mnowak@isc.org>
Mon, 11 May 2026 13:31:41 +0000 (15:31 +0200)
Add a new helper function, isctest.transfer.transfer_message(), to
bin/tests/system/isctest/transfer.py that generates the log message
produced by xfrin_log() in lib/dns/xfrin.c for an incoming zone
transfer:

    transfer of '<zone>/IN' from <source_ns>#<port>: <msg>

The helper always returns a compiled re.Pattern.  source_ns and port
each accept None to match any source address / port.  msg accepts
either a plain str (regex-escaped automatically) or a compiled
re.Pattern (spliced into the regex as-is), so callers that need regex
syntax in the message part can pass Re(r"...") without having to
wrap the whole result.

source_ns is passed through re.escape() when provided, so dots in
IPv4 addresses (e.g. "10.53.0.1") match a literal dot rather than
any character.

Convert the existing call sites across the system tests to use the
new helper.

Co-Authored-By: Nicki Křížek <nicki@isc.org>
Assisted-by: Claude:claude-sonnet-4-6
Assisted-by: Claude:claude-opus-4-7
bin/tests/system/cipher_suites/tests_cipher_suites.py
bin/tests/system/ede24/common.py
bin/tests/system/ede24/tests_ede24_expired.py
bin/tests/system/ede24/tests_ede24_noloaded.py
bin/tests/system/isctest/__init__.py
bin/tests/system/isctest/transfer.py [new file with mode: 0644]
bin/tests/system/mirror_root_zone/tests_mirror_root_zone.py
bin/tests/system/xfer/tests_retransfer_with_force.py
bin/tests/system/xfer/tests_xfer.py
bin/tests/system/xfer_servers_list/tests_xfer_servers_list.py
bin/tests/system/xferquota/tests_xferquota.py

index c20401bbce04cf598c2c199cfb81aa78cbe87b81..b934eb3f1252c7b729cf720da9bc3d70f7e6caac 100644 (file)
@@ -9,8 +9,6 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-from re import compile as Re
-
 import dns.rcode
 import pytest
 
@@ -27,8 +25,8 @@ pytestmark = pytest.mark.extra_artifacts(
 @pytest.fixture(scope="module")
 def transfers_complete(servers):
     for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]:
-        pattern = Re(
-            f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed"
+        pattern = isctest.transfer.transfer_message(
+            zone, "10.53.0.1", "Transfer completed"
         )
         for ns in ["ns2", "ns3", "ns4", "ns5"]:
             with servers[ns].watch_log_from_start() as watcher:
index d7dd983e720aa0b342b101c0777ed3fbc11bdf71..13e0ae1f30b363fdd8b7364cca4f4e45f0c07cc6 100644 (file)
@@ -27,9 +27,13 @@ def check_soa_servfail_ede24(edemsg):
     isctest.check.ede(res, EDECode.INVALID_DATA, edemsg)
 
 
-def check_ns2_ready(ns2):
+def check_ns2_ready(ns2, named_port):
     # Sanity check that everything works first, once we're sure the foo.fr zone
     # has transfered to ns2.
     with ns2.watch_log_from_start() as watcher:
-        watcher.wait_for_line("Transfer status: success")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "foo.fr", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
     check_soa_noerror()
index 02e129f4ba1bc5265be419062a0008da634e1913..16d2d48fffdb45c8b7656fc6707a73aaddbde632 100644 (file)
@@ -13,9 +13,11 @@ import os
 
 from ede24.common import check_ns2_ready, check_soa_noerror, check_soa_servfail_ede24
 
+import isctest
 
-def test_ede24_expired(ns1, ns2):
-    check_ns2_ready(ns2)
+
+def test_ede24_expired(named_port, ns1, ns2):
+    check_ns2_ready(ns2, named_port)
 
     # Stop the primary and wait for expiration of the zone in the secondary.
     with ns2.watch_log_from_here() as watcher:
@@ -32,5 +34,9 @@ def test_ede24_expired(ns1, ns2):
     # Restart the primary and wait for the zone to be back up again.
     with ns2.watch_log_from_here() as watcher:
         ns1.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
-        watcher.wait_for_line("Transfer status: success")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "foo.fr", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
     check_soa_noerror()
index 90b77b8a556267ff25a1405b968e4cea0081a7c1..0b92b63e7be75ad973eedd0c5d4c923a7dddd76e 100644 (file)
@@ -14,8 +14,8 @@ import os
 from ede24.common import check_ns2_ready, check_soa_servfail_ede24
 
 
-def test_ede24_noloaded(ns1, ns2):
-    check_ns2_ready(ns2)
+def test_ede24_noloaded(named_port, ns1, ns2):
+    check_ns2_ready(ns2, named_port)
 
     # Stop all servers, and we'll restart only ns2.
     ns1.stop()
index b1731f5cd5c6f1e9399d9608d92f6244c09d1207..a817eaa4cd502565ff3080c9d0fdaaa2f77f42b4 100644 (file)
@@ -18,6 +18,7 @@ from . import (  # pylint: disable=redefined-builtin
     query,
     run,
     template,
+    transfer,
     vars,
 )
 
@@ -35,5 +36,6 @@ __all__ = [
     "query",
     "run",
     "template",
+    "transfer",
     "vars",
 ]
diff --git a/bin/tests/system/isctest/transfer.py b/bin/tests/system/isctest/transfer.py
new file mode 100644 (file)
index 0000000..95ff5ba
--- /dev/null
@@ -0,0 +1,47 @@
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0.  If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+from re import Pattern
+
+import re
+
+
+def transfer_message(
+    zone: str, source_ns: str | None, msg: str | Pattern, port: int | None = None
+) -> Pattern:
+    """Return the expected log message for an incoming zone transfer.
+
+    Mirrors the format produced by xfrin_log() in lib/dns/xfrin.c:
+
+        transfer of '<zone>/IN' from <source_ns>#<port>: <msg>
+
+    Always returns a compiled Pattern.  When source_ns or port is None,
+    the unknown part is replaced by a wildcard in the regex.
+
+    Args:
+        zone:      Zone name (without class, e.g. "example.com").
+        source_ns: Source nameserver IP address string (e.g. "10.53.0.1"),
+                   or None to match any source address.
+        msg:       Transfer status or other message as a plain string
+                   (e.g. "Transfer status: success"), which is regex-escaped,
+                   or a compiled Pattern whose .pattern is spliced in as-is
+                   for callers that need regex syntax in the message part.
+        port:      Source port number, or None to match any port.
+    """
+    source_str = re.escape(source_ns) if source_ns is not None else ".*"
+    port_str = str(port) if port is not None else "[0-9]+"
+    msg_str = msg.pattern if isinstance(msg, Pattern) else re.escape(msg)
+
+    return re.compile(
+        re.escape(f"transfer of '{zone}/IN' from ")
+        + f"{source_str}#{port_str}: "
+        + msg_str
+    )
index fd638b88bc801be87b7530b2b2c13e72eff3e850..51e303a5c94e4e071c69439465438d76dabd3251 100644 (file)
@@ -13,6 +13,8 @@
 from isctest.instance import NamedInstance
 from isctest.mark import live_internet_test
 
+import isctest
+
 
 @live_internet_test
 def test_mirror_root_zone(servers: dict[str, NamedInstance]):
@@ -23,4 +25,6 @@ def test_mirror_root_zone(servers: dict[str, NamedInstance]):
     ns1 = servers["ns1"]
     with ns1.watch_log_from_start() as watch_log:
         # TimeoutError is raised if the line is not found and the test will fail.
-        watch_log.wait_for_line("Transfer status: success")
+        watch_log.wait_for_line(
+            isctest.transfer.transfer_message(".", None, "Transfer status: success", 53)
+        )
index 2ecae569334fc51ef4cef5d036b5b1ab34631277..0553e0fd0f4ba54b63716ab57c0ff919c229dbd9 100644 (file)
@@ -28,7 +28,9 @@ def test_wait_for_zone_retransfer(named_port, ns6):
     with ns6.watch_log_from_here() as watcher:
         ns6.rndc("retransfer axfr-rndc-retransfer-force.")
         watcher.wait_for_line(
-            f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: received"
+            isctest.transfer.transfer_message(
+                "axfr-rndc-retransfer-force", "10.53.0.1", "received", named_port
+            )
         )
 
 
@@ -40,11 +42,21 @@ def test_cancel_ongoing_retransfer(named_port, ns6):
         with ns6.watch_log_from_here() as watcher_transfer_shutting_down:
             ns6.rndc("retransfer -force axfr-rndc-retransfer-force.")
             watcher_transfer_shutting_down.wait_for_line(
-                f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: shutting down"
+                isctest.transfer.transfer_message(
+                    "axfr-rndc-retransfer-force",
+                    "10.53.0.1",
+                    "Transfer status: shutting down",
+                    named_port,
+                )
             )
         isctest.log.info("Wait for the new transfer to complete successfully")
         watcher_transfer_success.wait_for_line(
-            f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: success"
+            isctest.transfer.transfer_message(
+                "axfr-rndc-retransfer-force",
+                "10.53.0.1",
+                "Transfer status: success",
+                named_port,
+            )
         )
 
 
index 6fcfcef78d629c02ff4560528dce7dd58fcb7361..077ffaa333fca1daabf28ba2ebf7eac455877c74 100644 (file)
@@ -40,18 +40,26 @@ def send_switch_control_command(command):
 
 
 @pytest.fixture(scope="module", autouse=True)
-def after_servers_start(templates, ns4):
+def after_servers_start(templates, named_port, ns4):
     # initial correctly-signed transfer should succeed
     send_switch_control_command("goodaxfr")
 
     with ns4.watch_log_from_here() as watcher:
         templates.render("ns4/named.conf", {"ns4_as_secondary_for_nil": True})
         ns4.reconfigure()
-        watcher.wait_for_line("Transfer status: success")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: success", named_port
+            )
+        )
 
     with ns4.watch_log_from_here() as watcher_retransfer_nil_success:
         ns4.rndc("retransfer nil.")
-        watcher_retransfer_nil_success.wait_for_line("Transfer status: success")
+        watcher_retransfer_nil_success.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: success", named_port
+            )
+        )
 
 
 def get_response(msg, server_ip, allow_empty_answer=False):
@@ -297,7 +305,7 @@ def test_make_ns4_secondary_for_nil():
     check_rdata_in_txt_record("initial AXFR")
 
 
-def test_handle_ixfr_notimp(ns4):
+def test_handle_ixfr_notimp(named_port, ns4):
     send_switch_control_command("ixfrnotimp")
     with ns4.watch_log_from_here() as watcher_transfer_success:
         with ns4.watch_log_from_here() as watcher_requesting_ixfr:
@@ -305,76 +313,105 @@ def test_handle_ixfr_notimp(ns4):
             watcher_requesting_ixfr.wait_for_line(
                 "zone nil/IN: requesting IXFR from 10.53.0.5"
             )
-        watcher_transfer_success.wait_for_line("Transfer status: success")
+        watcher_transfer_success.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: success", named_port
+            )
+        )
 
     check_rdata_in_txt_record("IXFR NOTIMP")
 
 
 @pytest.mark.parametrize(
-    "command_file,expected_rdata,named_log_line",
+    "command_file,expected_rdata,named_log_line,xfrin_msg",
     [
         param(
             "unsigned",
             "unsigned AXFR",
             "Transfer status: expected a TSIG or SIG(0)",
+            True,
         ),
         param(
             "badkeydata",
             "bad keydata AXFR",
             "Transfer status: tsig verify failure",
+            True,
         ),
         param(
             "partial",
             "partially signed AXFR",
             "Transfer status: expected a TSIG or SIG(0)",
+            True,
         ),
         param(
             "unknownkey",
             "unknown key AXFR",
             "tsig key 'tsig_key': key name and algorithm do not match",
+            False,
         ),
         param(
             "wrongkey",
             "incorrect key AXFR",
             "tsig key 'tsig_key': key name and algorithm do not match",
+            False,
         ),
         param(
             "wrongname",
             "wrong question AXFR",
             "question name mismatch",
+            True,
         ),
         param(
             "badmessageid",
             "bad message id",
             "Transfer status: unexpected error",
+            True,
         ),
         param(
             "soamismatch",
             "SOA mismatch AXFR",
             "Transfer status: FORMERR",
+            True,
         ),
     ],
 )
-def test_under_signed_transfer(command_file, expected_rdata, named_log_line, ns4):
+def test_under_signed_transfer(
+    command_file, expected_rdata, named_log_line, xfrin_msg, named_port, ns4
+):
     send_switch_control_command(command_file)
     with ns4.watch_log_from_here() as watcher:
         ns4.rndc("retransfer nil.")
-        watcher.wait_for_line(named_log_line)
+        if xfrin_msg:
+            watcher.wait_for_line(
+                isctest.transfer.transfer_message(
+                    "nil", "10.53.0.5", named_log_line, named_port
+                )
+            )
+        else:
+            watcher.wait_for_line(named_log_line)
     check_rdata_in_txt_record(expected_rdata, should_exist=False)
 
 
-def test_handle_edns_notimp(ns4):
+def test_handle_edns_notimp(named_port, ns4):
     send_switch_control_command("ednsnotimp")
     with ns4.watch_log_from_here() as watcher:
         ns4.rndc("retransfer nil.")
-        watcher.wait_for_line("Transfer status: NOTIMP")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: NOTIMP", named_port
+            )
+        )
 
 
-def test_handle_edns_formerr(ns4):
+def test_handle_edns_formerr(named_port, ns4):
     send_switch_control_command("ednsformerr")
     with ns4.watch_log_from_here() as watcher:
         ns4.rndc("retransfer nil.")
-        watcher.wait_for_line("Transfer status: success")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "nil", "10.53.0.5", "Transfer status: success", named_port
+            )
+        )
     check_rdata_in_txt_record("EDNS FORMERR")
 
 
@@ -436,9 +473,16 @@ def test_mapped_zone(named_port, ns3):
 
 
 # test that a zone with too many records is rejected (AXFR)
-def test_axfr_too_many_records(ns6):
+def test_axfr_too_many_records(named_port, ns6):
     with ns6.watch_log_from_start() as watcher:
-        watcher.wait_for_line(Re("'axfr-too-big/IN'.*: too many records"))
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "axfr-too-big",
+                "10.53.0.1",
+                "Transfer status: too many records",
+                named_port,
+            )
+        )
 
 
 # test that a zone with too many records is rejected (IXFR)
@@ -451,7 +495,14 @@ def test_ixfr_too_many_records(named_port, ns6):
         send
         """
         nsupdate(nsupdate_config)
-        watcher.wait_for_line("Transfer status: too many records")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "ixfr-too-big",
+                "10.53.0.1",
+                "Transfer status: too many records",
+                named_port,
+            )
+        )
 
 
 # test that a zone with too many diffs (IXFR) is retried with AXFR
@@ -466,7 +517,12 @@ def test_ixfr_too_many_diffs(named_port, ns6):
     update add the-35th-record.ixfr-too-many-diffs 0 TXT ixfr
     send
     """
-    log_sequence = ["too many diffs, retrying with AXFR", "Transfer status: success"]
+    log_sequence = [
+        "too many diffs, retrying with AXFR",
+        isctest.transfer.transfer_message(
+            "ixfr-too-many-diffs", "10.53.0.1", "Transfer status: success", named_port
+        ),
+    ]
     with ns6.watch_log_from_here() as watcher_transfer_status:
         nsupdate(nsupdate_config)
         watcher_transfer_status.wait_for_sequence(log_sequence)
@@ -477,10 +533,14 @@ def test_dig_and_named_axfr_stats(named_port, ns3):
     # Use ns3 logs for checking incoming transfer statistics as ns3 is a
     # secondary server (for ns1) for "xfer-stats".
     with ns3.watch_log_from_start() as watcher_transfer_completed:
-        pattern_transfer_completed = (
-            "Transfer completed: 16 messages, 10003 records, 218403 bytes"
+        watcher_transfer_completed.wait_for_line(
+            isctest.transfer.transfer_message(
+                "xfer-stats",
+                "10.53.0.1",
+                "Transfer completed: 16 messages, 10003 records, 218403 bytes",
+                named_port,
+            )
         )
-        watcher_transfer_completed.wait_for_line(pattern_transfer_completed)
 
     # Loop until the secondary server manages to transfer the "xfer-stats" zone so
     # that we can both check dig output and immediately proceed with the next test.
@@ -508,11 +568,18 @@ def test_dig_and_named_axfr_stats(named_port, ns3):
 # First, test that named tries the next primary in the list when the first one
 # fails (XoT -> Do53). Then, test that named tries the next primary in the list
 # when the first one is already marked as unreachable (XoT -> Do53).
-def test_xot_primary_try_next(ns6):
+def test_xot_primary_try_next(named_port, ns6):
     def retransfer_and_check_log():
         with ns6.watch_log_from_here(timeout=60) as watcher:
             ns6.rndc("retransfer xot-primary-try-next.")
-            watcher.wait_for_line("Transfer status: success")
+            watcher.wait_for_line(
+                isctest.transfer.transfer_message(
+                    "xot-primary-try-next",
+                    "10.53.0.1",
+                    "Transfer status: success",
+                    named_port,
+                )
+            )
 
     retransfer_and_check_log()
     retransfer_and_check_log()
index cb7553d171a3a1fc888ac563e054ea64d27bfa9c..310ebd4d6c9e0d5b5d55b9d2a5b035d1352a9a40 100644 (file)
@@ -25,9 +25,13 @@ def check_soa(ns, serial):
     )
 
 
-def wait_for_initial_xfrin(ns):
+def wait_for_initial_xfrin(ns, named_port):
     with ns.watch_log_from_start() as watcher:
-        watcher.wait_for_line("Transfer status: success")
+        watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "test", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
     check_soa(ns, 1)
 
 
@@ -39,11 +43,11 @@ def wait_for_sending_notify(ns1, ns, key_name):
         watcher.wait_for_line(pattern)
 
 
-def test_xfer_servers_list(ns1, ns2, ns3, ns4, templates):
-    # First, wait for ns2, ns3 and ns4 to xfrin foo.fr and answer it
-    wait_for_initial_xfrin(ns2)
-    wait_for_initial_xfrin(ns3)
-    wait_for_initial_xfrin(ns4)
+def test_xfer_servers_list(named_port, ns1, ns2, ns3, ns4, templates):
+    # First, wait for ns2, ns3 and ns4 to xfrin test. and answer it
+    wait_for_initial_xfrin(ns2, named_port)
+    wait_for_initial_xfrin(ns3, named_port)
+    wait_for_initial_xfrin(ns4, named_port)
 
     # ns1 initially notifies the secondaries using the respectively configured keys
     # - 10.53.0.2 has the key defined where `secondaries` is used
@@ -56,13 +60,25 @@ def test_xfer_servers_list(ns1, ns2, ns3, ns4, templates):
     for ns, key_name in seq:
         wait_for_sending_notify(ns1, ns, key_name)
 
-    # Then, ns1 update foo.fr. It notifies ns2, ns3 and ns4 about it
+    # Then, ns1 update test. It notifies ns2, ns3 and ns4 about it
     templates.render("ns1/test.db", {"serial": 2})
     with ns2.watch_log_from_here() as ns2_watcher, ns3.watch_log_from_here() as ns3_watcher, ns4.watch_log_from_here() as ns4_watcher:
         ns1.rndc("reload")
-        ns2_watcher.wait_for_line("Transfer status: success")
-        ns3_watcher.wait_for_line("Transfer status: success")
-        ns4_watcher.wait_for_line("Transfer status: success")
+        ns2_watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "test", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
+        ns3_watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "test", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
+        ns4_watcher.wait_for_line(
+            isctest.transfer.transfer_message(
+                "test", "10.53.0.1", "Transfer status: success", named_port
+            )
+        )
     check_soa(ns2, 2)
     check_soa(ns3, 2)
     check_soa(ns4, 2)
index 6d255085cb558638bab65579a746d4e837f3d2bb..2a95a6c1a8a444a596a4a71b651956436f4029e0 100644 (file)
@@ -73,9 +73,8 @@ def test_xferquota(named_port, ns1, ns2):
         isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
 
     query_and_compare(axfr_msg)
-    pattern = Re(
-        f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
-        f"Transfer completed: .*\\(serial 2\\)"
+    pattern = isctest.transfer.transfer_message(
+        "changing", "10.53.0.1", Re(r"Transfer completed: .*\(serial 2\)"), named_port
     )
     with ns2.watch_log_from_start(timeout=30) as watcher:
         watcher.wait_for_line(pattern)