# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from re import compile as Re
-
import dns.rcode
import pytest
@pytest.fixture(scope="module")
def transfers_complete(servers):
for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]:
- pattern = Re(
- f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed"
+ pattern = isctest.transfer.transfer_message(
+ zone, "10.53.0.1", "Transfer completed"
)
for ns in ["ns2", "ns3", "ns4", "ns5"]:
with servers[ns].watch_log_from_start() as watcher:
isctest.check.ede(res, EDECode.INVALID_DATA, edemsg)
-def check_ns2_ready(ns2):
+def check_ns2_ready(ns2, named_port):
# Sanity check that everything works first, once we're sure the foo.fr zone
# has transfered to ns2.
with ns2.watch_log_from_start() as watcher:
- watcher.wait_for_line("Transfer status: success")
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "foo.fr", "10.53.0.1", "Transfer status: success", named_port
+ )
+ )
check_soa_noerror()
from ede24.common import check_ns2_ready, check_soa_noerror, check_soa_servfail_ede24
+import isctest
-def test_ede24_expired(ns1, ns2):
- check_ns2_ready(ns2)
+
+def test_ede24_expired(named_port, ns1, ns2):
+ check_ns2_ready(ns2, named_port)
# Stop the primary and wait for expiration of the zone in the secondary.
with ns2.watch_log_from_here() as watcher:
# Restart the primary and wait for the zone to be back up again.
with ns2.watch_log_from_here() as watcher:
ns1.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
- watcher.wait_for_line("Transfer status: success")
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "foo.fr", "10.53.0.1", "Transfer status: success", named_port
+ )
+ )
check_soa_noerror()
from ede24.common import check_ns2_ready, check_soa_servfail_ede24
-def test_ede24_noloaded(ns1, ns2):
- check_ns2_ready(ns2)
+def test_ede24_noloaded(named_port, ns1, ns2):
+ check_ns2_ready(ns2, named_port)
# Stop all servers, and we'll restart only ns2.
ns1.stop()
query,
run,
template,
+ transfer,
vars,
)
"query",
"run",
"template",
+ "transfer",
"vars",
]
--- /dev/null
+# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
+#
+# SPDX-License-Identifier: MPL-2.0
+#
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, you can obtain one at https://mozilla.org/MPL/2.0/.
+#
+# See the COPYRIGHT file distributed with this work for additional
+# information regarding copyright ownership.
+
+from re import Pattern
+
+import re
+
+
+def transfer_message(
+ zone: str, source_ns: str | None, msg: str | Pattern, port: int | None = None
+) -> Pattern:
+ """Return the expected log message for an incoming zone transfer.
+
+ Mirrors the format produced by xfrin_log() in lib/dns/xfrin.c:
+
+ transfer of '<zone>/IN' from <source_ns>#<port>: <msg>
+
+ Always returns a compiled Pattern. When source_ns or port is None,
+ the unknown part is replaced by a wildcard in the regex.
+
+ Args:
+ zone: Zone name (without class, e.g. "example.com").
+ source_ns: Source nameserver IP address string (e.g. "10.53.0.1"),
+ or None to match any source address.
+ msg: Transfer status or other message as a plain string
+ (e.g. "Transfer status: success"), which is regex-escaped,
+ or a compiled Pattern whose .pattern is spliced in as-is
+ for callers that need regex syntax in the message part.
+ port: Source port number, or None to match any port.
+ """
+ source_str = re.escape(source_ns) if source_ns is not None else ".*"
+ port_str = str(port) if port is not None else "[0-9]+"
+ msg_str = msg.pattern if isinstance(msg, Pattern) else re.escape(msg)
+
+ return re.compile(
+ re.escape(f"transfer of '{zone}/IN' from ")
+ + f"{source_str}#{port_str}: "
+ + msg_str
+ )
from isctest.instance import NamedInstance
from isctest.mark import live_internet_test
+import isctest
+
@live_internet_test
def test_mirror_root_zone(servers: dict[str, NamedInstance]):
ns1 = servers["ns1"]
with ns1.watch_log_from_start() as watch_log:
# TimeoutError is raised if the line is not found and the test will fail.
- watch_log.wait_for_line("Transfer status: success")
+ watch_log.wait_for_line(
+ isctest.transfer.transfer_message(".", None, "Transfer status: success", 53)
+ )
with ns6.watch_log_from_here() as watcher:
ns6.rndc("retransfer axfr-rndc-retransfer-force.")
watcher.wait_for_line(
- f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: received"
+ isctest.transfer.transfer_message(
+ "axfr-rndc-retransfer-force", "10.53.0.1", "received", named_port
+ )
)
with ns6.watch_log_from_here() as watcher_transfer_shutting_down:
ns6.rndc("retransfer -force axfr-rndc-retransfer-force.")
watcher_transfer_shutting_down.wait_for_line(
- f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: shutting down"
+ isctest.transfer.transfer_message(
+ "axfr-rndc-retransfer-force",
+ "10.53.0.1",
+ "Transfer status: shutting down",
+ named_port,
+ )
)
isctest.log.info("Wait for the new transfer to complete successfully")
watcher_transfer_success.wait_for_line(
- f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: success"
+ isctest.transfer.transfer_message(
+ "axfr-rndc-retransfer-force",
+ "10.53.0.1",
+ "Transfer status: success",
+ named_port,
+ )
)
@pytest.fixture(scope="module", autouse=True)
-def after_servers_start(templates, ns4):
+def after_servers_start(templates, named_port, ns4):
# initial correctly-signed transfer should succeed
send_switch_control_command("goodaxfr")
with ns4.watch_log_from_here() as watcher:
templates.render("ns4/named.conf", {"ns4_as_secondary_for_nil": True})
ns4.reconfigure()
- watcher.wait_for_line("Transfer status: success")
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "nil", "10.53.0.5", "Transfer status: success", named_port
+ )
+ )
with ns4.watch_log_from_here() as watcher_retransfer_nil_success:
ns4.rndc("retransfer nil.")
- watcher_retransfer_nil_success.wait_for_line("Transfer status: success")
+ watcher_retransfer_nil_success.wait_for_line(
+ isctest.transfer.transfer_message(
+ "nil", "10.53.0.5", "Transfer status: success", named_port
+ )
+ )
def get_response(msg, server_ip, allow_empty_answer=False):
check_rdata_in_txt_record("initial AXFR")
-def test_handle_ixfr_notimp(ns4):
+def test_handle_ixfr_notimp(named_port, ns4):
send_switch_control_command("ixfrnotimp")
with ns4.watch_log_from_here() as watcher_transfer_success:
with ns4.watch_log_from_here() as watcher_requesting_ixfr:
watcher_requesting_ixfr.wait_for_line(
"zone nil/IN: requesting IXFR from 10.53.0.5"
)
- watcher_transfer_success.wait_for_line("Transfer status: success")
+ watcher_transfer_success.wait_for_line(
+ isctest.transfer.transfer_message(
+ "nil", "10.53.0.5", "Transfer status: success", named_port
+ )
+ )
check_rdata_in_txt_record("IXFR NOTIMP")
@pytest.mark.parametrize(
- "command_file,expected_rdata,named_log_line",
+ "command_file,expected_rdata,named_log_line,xfrin_msg",
[
param(
"unsigned",
"unsigned AXFR",
"Transfer status: expected a TSIG or SIG(0)",
+ True,
),
param(
"badkeydata",
"bad keydata AXFR",
"Transfer status: tsig verify failure",
+ True,
),
param(
"partial",
"partially signed AXFR",
"Transfer status: expected a TSIG or SIG(0)",
+ True,
),
param(
"unknownkey",
"unknown key AXFR",
"tsig key 'tsig_key': key name and algorithm do not match",
+ False,
),
param(
"wrongkey",
"incorrect key AXFR",
"tsig key 'tsig_key': key name and algorithm do not match",
+ False,
),
param(
"wrongname",
"wrong question AXFR",
"question name mismatch",
+ True,
),
param(
"badmessageid",
"bad message id",
"Transfer status: unexpected error",
+ True,
),
param(
"soamismatch",
"SOA mismatch AXFR",
"Transfer status: FORMERR",
+ True,
),
],
)
-def test_under_signed_transfer(command_file, expected_rdata, named_log_line, ns4):
+def test_under_signed_transfer(
+ command_file, expected_rdata, named_log_line, xfrin_msg, named_port, ns4
+):
send_switch_control_command(command_file)
with ns4.watch_log_from_here() as watcher:
ns4.rndc("retransfer nil.")
- watcher.wait_for_line(named_log_line)
+ if xfrin_msg:
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "nil", "10.53.0.5", named_log_line, named_port
+ )
+ )
+ else:
+ watcher.wait_for_line(named_log_line)
check_rdata_in_txt_record(expected_rdata, should_exist=False)
-def test_handle_edns_notimp(ns4):
+def test_handle_edns_notimp(named_port, ns4):
send_switch_control_command("ednsnotimp")
with ns4.watch_log_from_here() as watcher:
ns4.rndc("retransfer nil.")
- watcher.wait_for_line("Transfer status: NOTIMP")
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "nil", "10.53.0.5", "Transfer status: NOTIMP", named_port
+ )
+ )
-def test_handle_edns_formerr(ns4):
+def test_handle_edns_formerr(named_port, ns4):
send_switch_control_command("ednsformerr")
with ns4.watch_log_from_here() as watcher:
ns4.rndc("retransfer nil.")
- watcher.wait_for_line("Transfer status: success")
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "nil", "10.53.0.5", "Transfer status: success", named_port
+ )
+ )
check_rdata_in_txt_record("EDNS FORMERR")
# test that a zone with too many records is rejected (AXFR)
-def test_axfr_too_many_records(ns6):
+def test_axfr_too_many_records(named_port, ns6):
with ns6.watch_log_from_start() as watcher:
- watcher.wait_for_line(Re("'axfr-too-big/IN'.*: too many records"))
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "axfr-too-big",
+ "10.53.0.1",
+ "Transfer status: too many records",
+ named_port,
+ )
+ )
# test that a zone with too many records is rejected (IXFR)
send
"""
nsupdate(nsupdate_config)
- watcher.wait_for_line("Transfer status: too many records")
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "ixfr-too-big",
+ "10.53.0.1",
+ "Transfer status: too many records",
+ named_port,
+ )
+ )
# test that a zone with too many diffs (IXFR) is retried with AXFR
update add the-35th-record.ixfr-too-many-diffs 0 TXT ixfr
send
"""
- log_sequence = ["too many diffs, retrying with AXFR", "Transfer status: success"]
+ log_sequence = [
+ "too many diffs, retrying with AXFR",
+ isctest.transfer.transfer_message(
+ "ixfr-too-many-diffs", "10.53.0.1", "Transfer status: success", named_port
+ ),
+ ]
with ns6.watch_log_from_here() as watcher_transfer_status:
nsupdate(nsupdate_config)
watcher_transfer_status.wait_for_sequence(log_sequence)
# Use ns3 logs for checking incoming transfer statistics as ns3 is a
# secondary server (for ns1) for "xfer-stats".
with ns3.watch_log_from_start() as watcher_transfer_completed:
- pattern_transfer_completed = (
- "Transfer completed: 16 messages, 10003 records, 218403 bytes"
+ watcher_transfer_completed.wait_for_line(
+ isctest.transfer.transfer_message(
+ "xfer-stats",
+ "10.53.0.1",
+ "Transfer completed: 16 messages, 10003 records, 218403 bytes",
+ named_port,
+ )
)
- watcher_transfer_completed.wait_for_line(pattern_transfer_completed)
# Loop until the secondary server manages to transfer the "xfer-stats" zone so
# that we can both check dig output and immediately proceed with the next test.
# First, test that named tries the next primary in the list when the first one
# fails (XoT -> Do53). Then, test that named tries the next primary in the list
# when the first one is already marked as unreachable (XoT -> Do53).
-def test_xot_primary_try_next(ns6):
+def test_xot_primary_try_next(named_port, ns6):
def retransfer_and_check_log():
with ns6.watch_log_from_here(timeout=60) as watcher:
ns6.rndc("retransfer xot-primary-try-next.")
- watcher.wait_for_line("Transfer status: success")
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "xot-primary-try-next",
+ "10.53.0.1",
+ "Transfer status: success",
+ named_port,
+ )
+ )
retransfer_and_check_log()
retransfer_and_check_log()
)
-def wait_for_initial_xfrin(ns):
+def wait_for_initial_xfrin(ns, named_port):
with ns.watch_log_from_start() as watcher:
- watcher.wait_for_line("Transfer status: success")
+ watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "test", "10.53.0.1", "Transfer status: success", named_port
+ )
+ )
check_soa(ns, 1)
watcher.wait_for_line(pattern)
-def test_xfer_servers_list(ns1, ns2, ns3, ns4, templates):
- # First, wait for ns2, ns3 and ns4 to xfrin foo.fr and answer it
- wait_for_initial_xfrin(ns2)
- wait_for_initial_xfrin(ns3)
- wait_for_initial_xfrin(ns4)
+def test_xfer_servers_list(named_port, ns1, ns2, ns3, ns4, templates):
+ # First, wait for ns2, ns3 and ns4 to xfrin test. and answer it
+ wait_for_initial_xfrin(ns2, named_port)
+ wait_for_initial_xfrin(ns3, named_port)
+ wait_for_initial_xfrin(ns4, named_port)
# ns1 initially notifies the secondaries using the respectively configured keys
# - 10.53.0.2 has the key defined where `secondaries` is used
for ns, key_name in seq:
wait_for_sending_notify(ns1, ns, key_name)
- # Then, ns1 update foo.fr. It notifies ns2, ns3 and ns4 about it
+ # Then, ns1 update test. It notifies ns2, ns3 and ns4 about it
templates.render("ns1/test.db", {"serial": 2})
with ns2.watch_log_from_here() as ns2_watcher, ns3.watch_log_from_here() as ns3_watcher, ns4.watch_log_from_here() as ns4_watcher:
ns1.rndc("reload")
- ns2_watcher.wait_for_line("Transfer status: success")
- ns3_watcher.wait_for_line("Transfer status: success")
- ns4_watcher.wait_for_line("Transfer status: success")
+ ns2_watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "test", "10.53.0.1", "Transfer status: success", named_port
+ )
+ )
+ ns3_watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "test", "10.53.0.1", "Transfer status: success", named_port
+ )
+ )
+ ns4_watcher.wait_for_line(
+ isctest.transfer.transfer_message(
+ "test", "10.53.0.1", "Transfer status: success", named_port
+ )
+ )
check_soa(ns2, 2)
check_soa(ns3, 2)
check_soa(ns4, 2)
isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
query_and_compare(axfr_msg)
- pattern = Re(
- f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
- f"Transfer completed: .*\\(serial 2\\)"
+ pattern = isctest.transfer.transfer_message(
+ "changing", "10.53.0.1", Re(r"Transfer completed: .*\(serial 2\)"), named_port
)
with ns2.watch_log_from_start(timeout=30) as watcher:
watcher.wait_for_line(pattern)