# information regarding copyright ownership.
import concurrent.futures
+import os
+import subprocess
import time
import pytest
-import isctest
-
pytestmark = pytest.mark.extra_artifacts(
[
"ns*/*.nzf*",
["delzone", domain],
]
+ args = [os.environ["RNDC"]] + ns3.rndc_args.split()
while not test_state["finished"]:
for command in rndc_commands:
- ns3.rndc(" ".join(command), ignore_errors=True, log=False)
+ # avoid using ns3.rndc() directly to avoid log spam
+ subprocess.run(args + " ".join(command), timeout=10, check=False)
def check_if_server_is_responsive(ns3):
"""
Check if server status can be successfully retrieved using "rndc status"
"""
- try:
- ns3.rndc("status", log=False)
- return True
- except isctest.rndc.RNDCException:
- return False
+ cmd = ns3.rndc("status", raise_on_exception=False)
+ return cmd.rc == 0
def test_rndc_deadlock(ns3):
)
def test_showzone_static(ns1, templates, allow):
templates.render("ns1/named.conf", {"allownewzones": allow})
- ns1.rndc("reload", log=False)
- zoneconfig = ns1.rndc("showzone inlinesec.example", log=False)
+ ns1.rndc("reload")
+ response = ns1.rndc("showzone inlinesec.example")
assert (
- zoneconfig
- == 'zone "inlinesec.example" { type primary; file "inlinesec.db"; };\n'
+ 'zone "inlinesec.example" { type primary; file "inlinesec.db"; };'
+ in response.out
)
assert val != 0
-def rekey(zone):
- rndc = os.getenv("RNDC")
- assert rndc is not None
-
- port = os.getenv("CONTROLPORT")
- assert port is not None
-
- # rndc loadkeys.
- rndc_cmd = [
- rndc,
- "-c",
- "../_common/rndc.conf",
- "-p",
- port,
- "-s",
- "10.53.0.9",
- "loadkeys",
- zone,
- ]
- controller = isctest.run.cmd(rndc_cmd)
-
- if controller.rc != 0:
- isctest.log.error(f"rndc loadkeys {zone} failed")
-
- assert controller.rc == 0
-
-
class CheckDSTest(NamedTuple):
zone: str
logs_to_wait_for: Tuple[str]
for log_string in params.logs_to_wait_for:
line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
while line not in ns9.log:
- rekey(params.zone)
+ ns9.rndc(f"loadkeys {params.zone}")
time_remaining -= 1
assert time_remaining, f'Timed out waiting for "{log_string}" to be logged'
time.sleep(1)
from re import compile as Re
-import isctest
-
def test_configloading_log(ns1):
"""
watcher.wait_for_sequence(log_sequence)
with ns1.watch_log_from_here() as watcher:
- ns1.rndc("reconfig", log=False)
+ ns1.rndc("reconfig")
watcher.wait_for_sequence(log_sequence)
with ns1.watch_log_from_here() as watcher:
- ns1.rndc("reload", log=False)
+ ns1.rndc("reload")
watcher.wait_for_sequence(log_sequence)
with ns1.watch_log_from_here() as watcher:
templates.render("ns1/named.conf", {"wrongoption": True})
- try:
- ns1.rndc("reload", log=False)
- assert False
- except isctest.rndc.RNDCException:
- watcher.wait_for_sequence(log_sequence)
+ cmd = ns1.rndc("reload", raise_on_exception=False)
+ assert cmd.rc != 0
+ watcher.wait_for_sequence(log_sequence)
templates.render("ns3/named.conf", {"long_sigs": True})
with ns3.watch_log_from_here() as watcher:
- ns3.reconfigure(log=False)
+ ns3.reconfigure()
watcher.wait_for_line(
"zone_needdump: zone siginterval.example/IN (signed): enter"
)
assert after != before
- ns3.rndc("sign siginterval.example", log=False)
+ ns3.rndc("sign siginterval.example")
msg = isctest.query.create("siginterval.example.", "SOA")
res = isctest.query.tcp(msg, "10.53.0.3")
)
def test_rndc_signing_except(cmd, ns3):
# check that 'rndc signing' errors are handled
- with pytest.raises(isctest.rndc.RNDCException):
- ns3.rndc(cmd, log=False)
- ns3.rndc("status", log=False)
+ ret = ns3.rndc(cmd, raise_on_exception=False)
+ assert ret.rc != 0
def test_rndc_signing_output(ns3):
- response = ns3.rndc("signing -list dynamic.example", log=False)
- assert "No signing records found" in response
+ response = ns3.rndc("signing -list dynamic.example")
+ assert "No signing records found" in response.out
def test_zonestatus_signing(ns3):
# for the name and type, and check that the resigning time is
# after the inception and before the expiration.
- response = ns3.rndc("zonestatus secure.example", log=False)
+ response = ns3.rndc("zonestatus secure.example")
# next resign node: secure.example/DNSKEY
- nrn = [r for r in response.splitlines() if "next resign node" in r][0]
+ nrn = [r for r in response.out.splitlines() if "next resign node" in r][0]
rdname, rdtype = nrn.split()[3].split("/")
# next resign time: Thu, 24 Apr 2014 10:38:16 GMT
- nrt = [r for r in response.splitlines() if "next resign time" in r][0]
+ nrt = [r for r in response.out.splitlines() if "next resign time" in r][0]
rtime = " ".join(nrt.split()[3:])
rt = time.strptime(rtime, "%a, %d %b %Y %H:%M:%S %Z")
when = int(time.strftime("%s", rt))
def loadkeys():
pattern = Re(f"{zone}/IN.*next key event")
with ns2.watch_log_from_here() as watcher:
- ns2.rndc(f"loadkeys {zone}", log=False)
+ ns2.rndc(f"loadkeys {zone}")
watcher.wait_for_line(pattern)
ksk_only_types = ["DNSKEY", "CDNSKEY", "CDS"]
ZSKID2 = getkeyid(ZSK2)
isctest.log.info("prepublish new ZSK")
- ns2.rndc(f"dnssec -rollover -key {ZSKID} {zone}", log=False)
+ ns2.rndc(f"dnssec -rollover -key {ZSKID} {zone}")
isctest.run.retry_with_timeout(check_zskcount, 5)
isctest.log.info("make the new ZSK active")
settime("-sKns2", "-k", "HIDDEN", "now", "-z", "HIDDEN", "now", "-Dnow", ZSK)
settime("-sKns2", "-k", "OMNIPRESENT", "now", "-z", "OMNIPRESENT", "now", ZSK2)
loadkeys()
- ns2.rndc(f"dnssec -rollover -key {ZSKID2} {zone}", log=False)
+ ns2.rndc(f"dnssec -rollover -key {ZSKID2} {zone}")
with ns2.watch_log_from_start() as watcher:
watcher.wait_for_line(f"{ZSKID3} (ZSK) is now published")
# check that "rndc secroots" dumps the trusted keys
key = int(getfrom("ns1/managed.key.id"))
alg = os.environ["DEFAULT_ALGORITHM"]
- expected = f"./{alg}/{key} ; static"
- response = ns4.rndc("secroots -", log=False).splitlines()
- assert expected in response
- assert len(response) == 10
+ response = ns4.rndc("secroots -")
+ assert f"./{alg}/{key} ; static" in response.out
+ assert len(response.out.splitlines()) == 10
def test_positive_validation_nsec():
def test_cache(ns4):
# check that key id's are logged when dumping the cache
- ns4.rndc("dumpdb -cache", log=False)
+ ns4.rndc("dumpdb -cache")
dumpdb = isctest.text.TextFile("ns4/named_dump.db")
assert "; key id = " in dumpdb
isctest.check.noadflag(res2)
# insecurity proof using negative cache
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("insecure.example", "DS", cd=True)
isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("target.peer-ns-spoof", "A", cd=True)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
- ns4.rndc("dumpdb", log=False)
+ ns4.rndc("dumpdb")
dumpdb = isctest.text.TextFile("ns4/named_dump.db")
assert "10.53.0.100" in dumpdb
"ns2/peer.peer-ns-spoof.db.next", "ns2/peer.peer-ns-spoof.db.signed"
)
with ns2.watch_log_from_here() as watcher:
- ns2.rndc("reload peer.peer-ns-spoof", log=False)
+ ns2.rndc("reload peer.peer-ns-spoof")
watcher.wait_for_line("zone peer.peer-ns-spoof/IN: loaded serial 2000042408")
# and check we can resolve with the correct server address
"ns2/dnskey-rrsigs-stripped.db.next", "ns2/dnskey-rrsigs-stripped.db.signed"
)
with ns2.watch_log_from_here() as watcher:
- ns2.rndc("reload dnskey-rrsigs-stripped", log=False)
+ ns2.rndc("reload dnskey-rrsigs-stripped")
watcher.wait_for_line(
"zone dnskey-rrsigs-stripped/IN: loaded serial 2000042408"
)
"ns2/ds-rrsigs-stripped.db.next", "ns2/ds-rrsigs-stripped.db.signed"
)
with ns2.watch_log_from_here() as watcher:
- ns2.rndc("reload ds-rrsigs-stripped", log=False)
+ ns2.rndc("reload ds-rrsigs-stripped")
watcher.wait_for_line("zone ds-rrsigs-stripped/IN: loaded serial 2000042408")
# and check we can now resolve with the correct server address
isctest.check.adflag(res2)
# check recovery with mismatching NS
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("inconsistent", "NS", dnssec=False, cd=True)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noadflag(res)
def test_validating_forwarder(ns4, ns9):
# check validating forwarder behavior with mismatching NS
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("inconsistent", "NS", dnssec=False, cd=True)
res = isctest.query.tcp(msg, "10.53.0.9")
isctest.check.noerror(res)
isctest.check.adflag(res)
# check validating forwarder sends CD to validate with a local trust anchor
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("localkey.example", "SOA")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
isctest.check.noerror(res)
# test TTL is capped at RRSIG expiry time
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("expiring.example", "SOA", cd=True)
res1 = isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("expiring.example", "SOA")
assert rrset.ttl <= 60
# test TTL is capped at RRSIG expiry time in the additional section (NS)
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("expiring.example", "NS", cd=True)
res1 = isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("expiring.example", "NS")
assert rrset.ttl <= 60
# test TTL is capped at RRSIG expiry time in the additional section (MX)
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("expiring.example", "MX", cd=True)
res1 = isctest.query.tcp(msg, "10.53.0.4")
msg = isctest.query.create("expiring.example", "MX")
# a negative cache entry with trust level "pending" for the DS. prime
# with a +cd DS query to produce the negative cache entry, then send a
# query that uses that entry as part of the validation process.
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("insecure.example", "DS", cd=True)
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.noerror(res)
def test_accept_expired(ns4):
# test TTL of about-to-expire rrsets with accept-expired
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("expiring.example", "SOA")
msg.flags |= flags.CD
res1 = isctest.query.tcp(msg, "10.53.0.4")
# test TTL is capped at RRSIG expiry time in the additional section
# with accept-expired
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("expiring.example", "MX")
msg.flags |= flags.CD
res1 = isctest.query.tcp(msg, "10.53.0.4")
assert rrset.ttl <= 120
# test TTL of expired rrsets with accept-expired
- ns4.rndc("flush", log=False)
+ ns4.rndc("flush")
msg = isctest.query.create("expired.example", "SOA")
msg.flags |= flags.CD
res1 = isctest.query.tcp(msg, "10.53.0.4")
# check that "rndc secroots" dumps the trusted keys
key = int(getfrom("ns1/managed.key.id"))
alg = os.environ["DEFAULT_ALGORITHM"]
- expected = f"./{alg}/{key} ; managed"
- response = ns4.rndc("secroots -", log=False).splitlines()
- assert expected in response
- assert len(response) == 10
+ response = ns4.rndc("secroots -")
+ assert f"./{alg}/{key} ; managed" in response.out
+ assert len(response.out.splitlines()) == 10
def test_positive_validation_nsec_managed():
def test_keydata_storage(ns4):
- ns4.rndc("managed-keys sync", log=False)
+ ns4.rndc("managed-keys sync")
with isctest.log.WatchLogFromStart("ns4/managed-keys.bind") as watcher:
watcher.wait_for_line(["KEYDATA", "next refresh:"])
# check that "rndc secroots" dumps the trusted keys with multiple views
key = int(getfrom("ns1/managed.key.id"))
alg = os.environ["DEFAULT_ALGORITHM"]
- expected = f"./{alg}/{key} ; static"
- response = ns4.rndc("secroots -", log=False).splitlines()
- assert expected in response, response
- assert len(response) == 17
+ response = ns4.rndc("secroots -")
+ assert f"./{alg}/{key} ; static" in response.out
+ assert len(response.out.splitlines()) == 17
from . import instance
from . import query
from . import kasp
-from . import rndc
from . import run
from . import template
from . import log
from typing import List, NamedTuple, Optional
-import logging
import os
from pathlib import Path
import re
import dns.message
import dns.rcode
-from .log import debug, info, WatchLogFromStart, WatchLogFromHere
-from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
-from .run import perl
+from .log import debug, WatchLogFromStart, WatchLogFromHere
+from .run import CmdResult, EnvCmd, perl
from .query import udp
from .text import TextFile
identifier: str,
num: Optional[int] = None,
ports: Optional[NamedPorts] = None,
- rndc_logger: Optional[logging.Logger] = None,
- rndc_executor: Optional[RNDCExecutor] = None,
) -> None:
"""
`identifier` is the name of the instance's directory
this `named` instance is listening for various types of traffic (both
DNS traffic and RNDC commands). Defaults to ports set by the test
framework.
-
- `rndc_logger` is the `logging.Logger` to use for logging RNDC
- commands sent to this `named` instance.
-
- `rndc_executor` is an object implementing the `RNDCExecutor` interface
- that is used for executing RNDC commands on this `named` instance.
"""
self.directory = Path(identifier).absolute()
if not self.directory.is_dir():
ports = NamedPorts.from_env()
self.ports = ports
self.log = TextFile(os.path.join(identifier, "named.run"))
- self._rndc_executor = rndc_executor or RNDCBinaryExecutor()
- self._rndc_logger = rndc_logger
+
+ self._rndc_conf = Path("../_common/rndc.conf").absolute()
+ self._rndc = EnvCmd("RNDC", self.rndc_args)
+
+ @property
+ def rndc_args(self) -> str:
+ """Base arguments for calling RNDC to control the instance."""
+ return f"-c {self._rndc_conf} -s {self.ip} -p {self.ports.rndc}"
@property
def ip(self) -> str:
assert num is None or num == parsed_num, "mismatched num and identifier"
return parsed_num
- def rndc(self, command: str, ignore_errors: bool = False, log: bool = True) -> str:
+ def rndc(self, command: str, timeout=10, **kwargs) -> CmdResult:
"""
Send `command` to this named instance using RNDC. Return the server's
response.
- If the RNDC command fails, an `RNDCException` is raised unless
- `ignore_errors` is set to `True`.
-
- The RNDC command will be logged to `rndc.log` (along with the server's
- response) unless `log` is set to `False`.
-
- ```python
- def test_foo(servers):
- # Send the "status" command to ns1. An `RNDCException` will be
- # raised if the RNDC command fails. This command will be logged.
- response = servers["ns1"].rndc("status")
-
- # Send the "thaw foo" command to ns2. No exception will be raised
- # in case the RNDC command fails. This command will be logged
- # (even if it fails).
- response = servers["ns2"].rndc("thaw foo", ignore_errors=True)
-
- # Send the "stop" command to ns3. An `RNDCException` will be
- # raised if the RNDC command fails, but this command will not be
- # logged (the server's response will still be returned to the
- # caller, though).
- response = servers["ns3"].rndc("stop", log=False)
-
- # Send the "halt" command to ns4 in "fire & forget mode": no
- # exceptions will be raised and no logging will take place (the
- # server's response will still be returned to the caller, though).
- response = servers["ns4"].rndc("stop", ignore_errors=True, log=False)
- ```
+ To suppress exceptions, redirect outputs, control logging change
+ timeout etc. use keyword arguments which are passed to
+ isctest.cmd.run().
"""
- try:
- response = self._rndc_executor.call(self.ip, self.ports.rndc, command)
- if log:
- self._rndc_log(command, response)
- except RNDCException as exc:
- response = str(exc)
- if log:
- self._rndc_log(command, response)
- if not ignore_errors:
- raise
-
- return response
+ return self._rndc(command, timeout=timeout, **kwargs)
def nsupdate(
self, update_msg: dns.message.Message, expected_rcode=dns.rcode.NOERROR
"""
return WatchLogFromHere(self.log.path, timeout)
- def reconfigure(self, **kwargs) -> None:
+ def reconfigure(self, **kwargs) -> CmdResult:
"""
Reconfigure this named `instance` and wait until reconfiguration is
- finished. Raise an `RNDCException` if reconfiguration fails.
+ finished.
"""
with self.watch_log_from_here() as watcher:
- self.rndc("reconfig", **kwargs)
+ cmd = self.rndc("reconfig", **kwargs)
watcher.wait_for_line("any newly configured zones are now loaded")
-
- def _rndc_log(self, command: str, response: str) -> None:
- """
- Log an `rndc` invocation (and its output) to the `rndc.log` file in the
- current working directory.
- """
- fmt = '%(ip)s: "%(command)s"\n%(separator)s\n%(response)s%(separator)s'
- args = {
- "ip": self.ip,
- "command": command,
- "separator": "-" * 80,
- "response": response,
- }
- if self._rndc_logger is None:
- info(fmt, args)
- else:
- self._rndc_logger.info(fmt, args)
+ return cmd
def stop(self, args: Optional[List[str]] = None) -> None:
"""Stop the instance."""
v = "-v "
if view is None:
- response = server.rndc(f"dnssec -status {v}{zone}", log=False)
+ response = server.rndc(f"dnssec -status {v}{zone}")
else:
- response = server.rndc(f"dnssec -status {v}{zone} in {view}", log=False)
+ response = server.rndc(f"dnssec -status {v}{zone} in {view}")
if policy is None:
- assert "Zone does not have dnssec-policy" in response
+ assert "Zone does not have dnssec-policy" in response.out
return
- assert f"DNSSEC status for zone '{zone}' using policy '{policy}'" in response
+ assert f"DNSSEC status for zone '{zone}' using policy '{policy}'" in response.out
for key in keys:
if not key.external:
- assert f"{key.role()} {key.tag}" in response
+ assert f"{key.role()} {key.tag}" in response.out
def _check_signatures(
+++ /dev/null
-# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
-#
-# SPDX-License-Identifier: MPL-2.0
-#
-# This Source Code Form is subject to the terms of the Mozilla Public
-# License, v. 2.0. If a copy of the MPL was not distributed with this
-# file, you can obtain one at https://mozilla.org/MPL/2.0/.
-#
-# See the COPYRIGHT file distributed with this work for additional
-# information regarding copyright ownership.
-
-import abc
-import os
-import subprocess
-
-
-class RNDCExecutor(abc.ABC):
- """
- An interface which RNDC executors have to implement in order for the
- `NamedInstance` class to be able to use them.
- """
-
- @abc.abstractmethod
- def call(self, ip: str, port: int, command: str) -> str:
- """
- Send RNDC `command` to the `named` instance at `ip:port` and return the
- server's response.
- """
-
-
-class RNDCException(Exception):
- """
- Raised by classes implementing the `RNDCExecutor` interface when sending an
- RNDC command fails for any reason.
- """
-
-
-class RNDCBinaryExecutor(RNDCExecutor):
- """
- An `RNDCExecutor` which sends RNDC commands to servers using the `rndc`
- binary.
- """
-
- def __init__(self) -> None:
- """
- This class needs the `RNDC` environment variable to be set to the path
- to the `rndc` binary to use.
- """
- rndc_path = os.environ.get("RNDC", "/bin/false")
- rndc_conf = os.path.join("..", "_common", "rndc.conf")
- self._base_cmdline = [rndc_path, "-c", rndc_conf]
-
- def call(self, ip: str, port: int, command: str) -> str:
- """
- Send RNDC `command` to the `named` instance at `ip:port` and return the
- server's response.
- """
- cmdline = self._base_cmdline[:]
- cmdline.extend(["-s", ip])
- cmdline.extend(["-p", str(port)])
- cmdline.extend(command.split())
-
- try:
- return subprocess.check_output(
- cmdline, stderr=subprocess.STDOUT, timeout=10, encoding="utf-8"
- )
- except subprocess.SubprocessError as exc:
- msg = getattr(exc, "output", "RNDC exception occurred")
- raise RNDCException(msg) from exc
import os
import shutil
+import subprocess
import time
from datetime import timedelta
f"expected updates {expected_updates} policy {policy} ksks {ksks} zsks {zsks}"
)
shutil.copyfile(f"ns2/{zone}.db.in2", f"ns2/{zone}.db")
- servers["ns2"].rndc(f"reload {zone}", log=False)
+ servers["ns2"].rndc(f"reload {zone}")
def update_is_signed():
parts = update.split()
os.remove(k.statefile)
with servers["ns3"].watch_log_from_here() as watcher:
- servers["ns3"].rndc(f"loadkeys {zone}", log=False)
+ servers["ns3"].rndc(f"loadkeys {zone}")
watcher.wait_for_line(
f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
)
isctest.kasp.check_dnssecstatus(ns4, zone, keys, policy=policy, view=view)
isctest.kasp.check_apex(ns4, zone, keys, [], tsig=tsig)
# check zonestatus
- response = ns4.rndc(f"zonestatus {zone} in {view}", log=False)
- assert f"dynamic: {dynamic}" in response
- assert f"inline signing: {inline_signing}" in response
+ response = ns4.rndc(f"zonestatus {zone} in {view}")
+ assert f"dynamic: {dynamic}" in response.out
+ assert f"inline signing: {inline_signing}" in response.out
# check subdomain
fqdn = f"{zone}."
qname = f"view.{zone}."
state_stat = os.stat(key.statefile)
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"loadkeys {zone}", log=False)
+ ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
# again
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"loadkeys {zone}", log=False)
+ ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
# modify unsigned zone file and check that new record is signed.
isctest.log.info("check that an updated zone signs the new record")
shutil.copyfile("ns3/template2.db.in", f"ns3/{zone}.db")
- ns3.rndc(f"reload {zone}", log=False)
+ ns3.rndc(f"reload {zone}")
def update_is_signed():
parts = update.split()
shutil.move(f"{key.privatefile}", f"{key.path}.offline")
expectmsg = "zone_rekey:zone_verifykeys failed: some key files are missing"
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"loadkeys {zone}", log=False)
+ ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"zone {zone}/IN (signed): {expectmsg}")
# Nothing has changed.
expected[0].private = False # noqa
# Update zone with freeze/thaw.
isctest.log.info("check dynamic zone is updated and signed after freeze and thaw")
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"freeze {zone}", log=False)
+ ns3.rndc(f"freeze {zone}")
watcher.wait_for_line(f"freezing zone '{zone}/IN': success")
time.sleep(1)
time.sleep(1)
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"thaw {zone}", log=False)
+ ns3.rndc(f"thaw {zone}")
watcher.wait_for_line(f"thawing zone '{zone}/IN': success")
expected_updates = [f"a.{zone}. A 10.0.0.1", f"d.{zone}. A 10.0.0.44"]
"check dynamic inline-signed zone is updated and signed after freeze and thaw"
)
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"freeze {zone}", log=False)
+ ns3.rndc(f"freeze {zone}")
watcher.wait_for_line(f"freezing zone '{zone}/IN': success")
time.sleep(1)
time.sleep(1)
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"thaw {zone}", log=False)
+ ns3.rndc(f"thaw {zone}")
watcher.wait_for_line(f"thawing zone '{zone}/IN': success")
expected_updates = [f"a.{zone}. A 10.0.0.11", f"d.{zone}. A 10.0.0.44"]
ksk = ksks[0]
isctest.log.info("check if checkds -publish correctly sets DSPublish")
- ns3.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
+ ns3.rndc(f"dnssec -checkds -when {now} published {zone}")
metadata = f"DSPublish: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "rumoured"
isctest.kasp.check_keys(zone, keys, expected)
isctest.log.info("check if checkds -withdrawn correctly sets DSRemoved")
- ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
+ ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}")
metadata = f"DSRemoved: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "unretentive"
isctest.log.info("check invalid checkds commands")
def check_error():
- response = ns3.rndc(test["command"], log=False)
- assert test["error"] in response
+ response = ns3.rndc(test["command"], stderr=subprocess.STDOUT)
+ assert test["error"] in response.out
test_cases = [
{
check_error()
isctest.log.info("check if checkds -publish -key correctly sets DSPublish")
- ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} published {zone}", log=False)
+ ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} published {zone}")
metadata = f"DSPublish: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "rumoured"
isctest.log.info("check if checkds -withdrawn -key correctly sets DSRemoved")
ksk = ksks[1]
- ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} withdrawn {zone}", log=False)
+ ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} withdrawn {zone}")
metadata = f"DSRemoved: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[1].metadata["DSState"] = "unretentive"
ksk = keys[0]
isctest.log.info("check if checkds -publish csk correctly sets DSPublish")
- ns3.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
+ ns3.rndc(f"dnssec -checkds -when {now} published {zone}")
metadata = f"DSPublish: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "rumoured"
isctest.kasp.check_keys(zone, keys, expected)
isctest.log.info("check if checkds -withdrawn csk correctly sets DSRemoved")
- ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
+ ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}")
metadata = f"DSRemoved: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "unretentive"
# Load again, make sure the purged key is not an issue when verifying keys.
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"loadkeys {zone}", log=False)
+ ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
# Reconfig, make sure the purged key is not an issue when verifying keys.
shutil.copyfile("ns4/purgekeys2.conf", "ns4/purgekeys.conf")
with ns4.watch_log_from_here() as watcher:
- ns4.rndc("reconfig", log=False)
+ ns4.rndc("reconfig")
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN/example1 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
shutil.copyfile(f"ns6/{zone}2.db.in", f"ns6/{zone}.db")
with ns6.watch_log_from_here() as watcher:
- ns6.rndc("reload", log=False)
+ ns6.rndc("reload")
watcher.wait_for_line("all zones loaded")
newttl = 300
# Force step.
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"dnssec -step {zone}", log=False)
+ ns3.rndc(f"dnssec -step {zone}")
watcher.wait_for_line(
f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
)
# Load keys.
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"loadkeys {zone}", log=False)
+ ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(blockmsg)
# Check keys again, make sure no new keys are created.
# Force step.
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"dnssec -step {zone}", log=False)
+ ns3.rndc(f"dnssec -step {zone}")
watcher.wait_for_line(
f"zone {zone}/IN (signed): zone_rekey done: key {tag}/ECDSAP256SHA256"
)
def test_dig_tcp_keepalive_handling(named_port, ns2):
def get_keepalive_options_received():
- ns2.rndc("stats", log=False)
+ ns2.rndc("stats")
options_received = 0
with open("ns2/named.stats", "r", encoding="utf-8") as ns2_stats_file:
for line in ns2_stats_file:
)
isctest.log.info("check a re-configured keepalive value")
- response = ns2.rndc("tcp-timeouts 300 300 300 200 100", log=False)
- assert "tcp-initial-timeout=300" in response
- assert "tcp-idle-timeout=300" in response
- assert "tcp-keepalive-timeout=300" in response
- assert "tcp-advertised-timeout=200" in response
- assert "tcp-primaries-timeout=100" in response
+ response = ns2.rndc("tcp-timeouts 300 300 300 200 100")
+ assert "tcp-initial-timeout=300" in response.out
+ assert "tcp-idle-timeout=300" in response.out
+ assert "tcp-keepalive-timeout=300" in response.out
+ assert "tcp-advertised-timeout=200" in response.out
+ assert "tcp-primaries-timeout=100" in response.out
assert (
"; TCP-KEEPALIVE: 20.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.2").out
f"addzone {zone} "
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
- + "};",
- log=False,
+ + "};"
)
# import skr
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
- ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
+ ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
- log=False,
)
# import skr
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
- ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
+ ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
- log=False,
)
# import skr
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
- ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
+ ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
- log=False,
)
# import skr
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
- server.rndc(f"skr -import {skr_fname} {zone}", log=False)
+ server.rndc(f"skr -import {skr_fname} {zone}")
# test that rekey logs error
time_remaining = 10
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
- log=False,
)
# import skr
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
- ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
+ ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
- log=False,
)
# import skr
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
- ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
+ ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
- log=False,
)
# import skr
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
- ns1.rndc(f"skr -import {skr_fname} {zone}", log=False)
+ ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
- server.rndc(f"loadkeys {zone}", log=False)
+ server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
- server.rndc(f"loadkeys {zone}", log=False)
+ server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
- server.rndc(f"loadkeys {zone}", log=False)
+ server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
- server.rndc(f"loadkeys {zone}", log=False)
+ server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
- server.rndc(f"loadkeys {zone}", log=False)
+ server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
- server.rndc(f"loadkeys {zone}", log=False)
+ server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
- server.rndc(f"loadkeys {zone}", log=False)
+ server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
- server.rndc(f"loadkeys {zone}", log=False)
+ server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
- server.rndc(f"loadkeys {zone}", log=False)
+ server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
# Using rndc signing -nsec3param (should fail)
isctest.log.info(f"use rndc signing -nsec3param {zone} to change NSEC3 settings")
response = ns3.rndc(f"signing -nsec3param 1 1 12 ffff {zone}")
- assert "zone uses dnssec-policy, use rndc dnssec command instead" in response
+ assert "zone uses dnssec-policy, use rndc dnssec command instead" in response.out
# information regarding copyright ownership.
import os
-import re
+from re import compile as Re
import time
import isctest
-# helper functions
-def hasmatch(regex, blob):
- return re.search(regex, blob, flags=re.MULTILINE)
-
-
def active(blob):
return len([x for x in blob.splitlines() if " expiry" in x])
def test_nta_validate_except(servers):
ns4 = servers["ns4"]
- response = ns4.rndc("secroots -", log=False)
- assert hasmatch("^corp: permanent", response)
+ response = ns4.rndc("secroots -")
+ assert Re("^corp: permanent") in response.out
# check insecure local domain works with validate-except
m = isctest.query.create("www.corp", "NS")
ns4 = servers["ns4"]
# no nta lifetime specified:
- response = ns4.rndc("nta -l '' foo", ignore_errors=True, log=False)
- assert "'nta' failed: bad ttl" in response
+ response = ns4.rndc("nta -l '' foo", raise_on_exception=False)
+ assert "'nta' failed: bad ttl" in response.err
# bad nta lifetime:
- response = ns4.rndc("nta -l garbage foo", ignore_errors=True, log=False)
- assert "'nta' failed: bad ttl" in response
+ response = ns4.rndc("nta -l garbage foo", raise_on_exception=False)
+ assert "'nta' failed: bad ttl" in response.err
# excessive nta lifetime:
- response = ns4.rndc("nta -l 7d1h foo", ignore_errors=True, log=False)
- assert "'nta' failed: out of range" in response
+ response = ns4.rndc("nta -l 7d1h foo", raise_on_exception=False)
+ assert "'nta' failed: out of range" in response.err
def test_nta_install(servers):
global start
ns4 = servers["ns4"]
- ns4.rndc("nta -f -l 20s bogus.example", log=False)
- ns4.rndc("nta badds.example", log=False)
+ ns4.rndc("nta -f -l 20s bogus.example")
+ ns4.rndc("nta badds.example")
# NTAs should persist after reconfig
- with ns4.watch_log_from_here() as watcher:
- ns4.reconfigure(log=False)
- watcher.wait_for_line("any newly configured zones are now loaded")
+ ns4.reconfigure()
- response = ns4.rndc("nta -d", log=False)
- assert len(response.splitlines()) == 3
+ response = ns4.rndc("nta -d")
+ assert len(response.out.splitlines()) == 3
- ns4.rndc("nta secure.example", log=False)
- ns4.rndc("nta fakenode.secure.example", log=False)
+ ns4.rndc("nta secure.example")
+ ns4.rndc("nta fakenode.secure.example")
with ns4.watch_log_from_here() as watcher:
- ns4.rndc("reload", log=False)
+ ns4.rndc("reload")
watcher.wait_for_line("all zones loaded")
- response = ns4.rndc("nta -d", log=False)
- assert len(response.splitlines()) == 5
+ response = ns4.rndc("nta -d")
+ assert len(response.out.splitlines()) == 5
start = time.time()
isctest.check.noadflag(res)
ns4 = servers["ns4"]
- response = ns4.rndc("secroots -", log=False)
- assert hasmatch("^bogus.example: expiry", response)
- assert hasmatch("^badds.example: expiry", response)
- assert hasmatch("^secure.example: expiry", response)
- assert hasmatch("^fakenode.secure.example: expiry", response)
+ response = ns4.rndc("secroots -")
+ assert Re("^bogus.example: expiry") in response.out
+ assert Re("^badds.example: expiry") in response.out
+ assert Re("^secure.example: expiry") in response.out
+ assert Re("^fakenode.secure.example: expiry") in response.out
# secure.example and badds.example used the default nta-duration
# (configured as 12s in ns4/named1.conf), but the nta recheck interval
if delay > 0:
time.sleep(delay)
- response = ns4.rndc("nta -d", log=False)
- assert active(response) <= 2
+ response = ns4.rndc("nta -d")
+ assert active(response.out) <= 2
- response = ns4.rndc("secroots -", log=False)
- assert hasmatch("bogus.example: expiry", response)
- assert not hasmatch("badds.example: expiry", response)
+ response = ns4.rndc("secroots -")
+ assert Re("bogus.example: expiry") in response.out
+ assert Re("badds.example: expiry") not in response.out
m = isctest.query.create("b.bogus.example", "A")
res = isctest.query.tcp(m, "10.53.0.4")
if delay > 0:
time.sleep(delay)
- response = ns4.rndc("nta -d", log=False)
- assert active(response) == 0
+ response = ns4.rndc("nta -d")
+ assert active(response.out) == 0
m = isctest.query.create("d.secure.example", "A")
res = isctest.query.tcp(m, "10.53.0.4")
def test_nta_removals(servers):
ns4 = servers["ns4"]
- ns4.rndc("nta badds.example", log=False)
+ ns4.rndc("nta badds.example")
- response = ns4.rndc("nta -d", log=False)
- assert hasmatch("^badds.example/_default: expiry", response)
+ response = ns4.rndc("nta -d")
+ assert Re("^badds.example/_default: expiry") in response.out
m = isctest.query.create("a.badds.example", "A")
res = isctest.query.tcp(m, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.noadflag(res)
- response = ns4.rndc("nta -remove badds.example", log=False)
- assert "Negative trust anchor removed: badds.example" in response
+ response = ns4.rndc("nta -remove badds.example")
+ assert "Negative trust anchor removed: badds.example" in response.out
- response = ns4.rndc("nta -d", log=False)
- assert not hasmatch("^badds.example/_default: expiry", response)
+ response = ns4.rndc("nta -d")
+ assert Re("^badds.example/_default: expiry") not in response.out
res = isctest.query.tcp(m, "10.53.0.4")
isctest.check.servfail(res)
isctest.check.noadflag(res)
# remove non-existent NTA three times
- ns4.rndc("nta -r foo", log=False)
- ns4.rndc("nta -remove foo", log=False)
- response = ns4.rndc("nta -r foo", log=False)
- assert "not found" in response
+ ns4.rndc("nta -r foo")
+ ns4.rndc("nta -remove foo")
+ response = ns4.rndc("nta -r foo")
+ assert "not found" in response.out
def test_nta_restarts(servers):
# test NTA persistence across restarts
ns4 = servers["ns4"]
- response = ns4.rndc("nta -d", log=False)
- assert active(response) == 0
+ response = ns4.rndc("nta -d")
+ assert active(response.out) == 0
start = time.time()
- ns4.rndc("nta -f -l 30s bogus.example", log=False)
- ns4.rndc("nta -f -l 10s badds.example", log=False)
- response = ns4.rndc("nta -d", log=False)
- assert active(response) == 2
+ ns4.rndc("nta -f -l 30s bogus.example")
+ ns4.rndc("nta -f -l 10s badds.example")
+ response = ns4.rndc("nta -d")
+ assert active(response.out) == 2
# stop the server
ns4.stop()
time.sleep(delay)
ns4.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
- response = ns4.rndc("nta -d", log=False)
- assert active(response) == 1
- assert hasmatch("^bogus.example/_default: expiry", response)
+ response = ns4.rndc("nta -d")
+ assert active(response.out) == 1
+ assert Re("^bogus.example/_default: expiry") in response.out
m = isctest.query.create("a.badds.example", "A")
res = isctest.query.tcp(m, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.noadflag(res)
- ns4.rndc("nta -r bogus.example", log=False)
+ ns4.rndc("nta -r bogus.example")
def test_nta_regular(servers):
# check "regular" attribute in NTA file
ns4 = servers["ns4"]
- response = ns4.rndc("nta -d", log=False)
- assert active(response) == 0
+ response = ns4.rndc("nta -d")
+ assert active(response.out) == 0
# secure.example validates with AD=1
m = isctest.query.create("a.secure.example", "A")
if delay > 0:
time.sleep(delay)
- response = ns4.rndc("nta -d", log=False)
- assert active(response) == 0
+ response = ns4.rndc("nta -d")
+ assert active(response.out) == 0
# NTA lifted; secure.example. flush the cache to trigger a new query,
# and it should now return an AD=1 answer.
- ns4.rndc("flushtree secure.example", log=False)
+ ns4.rndc("flushtree secure.example")
res = isctest.query.tcp(m, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.adflag(res)
ns4 = servers["ns4"]
# just to be certain, clean up any existing NTA first
- ns4.rndc("nta -r secure.example", log=False)
+ ns4.rndc("nta -r secure.example")
- response = ns4.rndc("nta -d", log=False)
- assert active(response) == 0
+ response = ns4.rndc("nta -d")
+ assert active(response.out) == 0
# secure.example validates with AD=1
m = isctest.query.create("a.secure.example", "A")
time.sleep(delay)
# NTA lifted; secure.example. should still return an AD=0 answer
- ns4.rndc("flushtree secure.example", log=False)
+ ns4.rndc("flushtree secure.example")
res = isctest.query.tcp(m, "10.53.0.4")
isctest.check.noerror(res)
isctest.check.noadflag(res)
ns4 = servers["ns4"]
# clean up any existing NTA
- ns4.rndc("nta -r secure.example", log=False)
+ ns4.rndc("nta -r secure.example")
# stop the server, update _default.nta, restart
ns4.stop()
ns4.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
# check that NTA lifetime read from file is clamped to 1 week.
- response = ns4.rndc("nta -d", log=False)
- assert active(response) == 1
+ response = ns4.rndc("nta -d")
+ assert active(response.out) == 1
- nta = next((s for s in response.splitlines() if " expiry" in s), None)
+ nta = next((s for s in response.out.splitlines() if " expiry" in s), None)
assert nta is not None
nta = nta.split(" ")
assert abs(nextweek - then < 3610)
# remove the NTA
- ns4.rndc("nta -r secure.example", log=False)
+ ns4.rndc("nta -r secure.example")
def test_nta_forward(servers):
isctest.check.noadflag(res)
# add NTA and expect resolution to succeed
- ns9.rndc("nta badds.example", log=False)
+ ns9.rndc("nta badds.example")
res = isctest.query.tcp(m, "10.53.0.9")
isctest.check.noerror(res)
isctest.check.rr_count_eq(res.answer, 2)
isctest.check.noadflag(res)
# remove NTA and expect resolution to fail again
- ns9.rndc("nta -remove badds.example", log=False)
+ ns9.rndc("nta -remove badds.example")
res = isctest.query.tcp(m, "10.53.0.9")
isctest.check.servfail(res)
isctest.check.empty_answer(res)
isctest.check.refused(res)
# add new zone into the default NZD using "rndc addzone"
- ns1.rndc(f"addzone {zone_data}", log=False)
+ ns1.rndc(f"addzone {zone_data}")
# query for existing zone data
res = isctest.query.tcp(msg, ns1.ip)
isctest.check.noerror(res)
assert res.answer[0].ttl == 300
templates.render("ns1/named.conf", {"wrongoption": True})
- try:
- # The first reload fails, and the old cache list will be preserved
- ns1.rndc("reload")
- except isctest.rndc.RNDCException:
- templates.render("ns1/named.conf", {"wrongoption": False})
- # The second reload succeed, and the cache is still there, as preserved
- # from the old cache list
- ns1.rndc("reload")
- time.sleep(3)
- msg = isctest.query.create("www.example.org.", "A")
- res = isctest.query.udp(msg, "10.53.0.1")
- isctest.check.noerror(res)
- # The ttl being lower than 300 (provided by fake authoritative) proves
- # the cache is still in use
- assert res.answer[0].ttl < 300
+
+ # The first reload fails, and the old cache list will be preserved
+ cmd = ns1.rndc("reload", raise_on_exception=False)
+ assert cmd.rc != 0
+
+ templates.render("ns1/named.conf", {"wrongoption": False})
+ # The second reload succeed, and the cache is still there, as preserved
+ # from the old cache list
+ ns1.rndc("reload")
+ time.sleep(3)
+ msg = isctest.query.create("www.example.org.", "A")
+ res = isctest.query.udp(msg, "10.53.0.1")
+ isctest.check.noerror(res)
+
+ # The ttl being lower than 300 (provided by fake authoritative) proves
+ # the cache is still in use
+ assert res.answer[0].ttl < 300
# Force full resign and check all signatures have been replaced.
with ns3.watch_log_from_here() as watcher:
- ns3.rndc(f"sign {zone}", log=False)
+ ns3.rndc(f"sign {zone}")
watcher.wait_for_line(f"zone_needdump: zone {zone}/IN (signed): enter")
step["smooth"] = False
# Try to schedule a ZSK rollover for an inactive key (should fail).
zsk = expected[3].key
response = ns3.rndc(f"dnssec -rollover -key {zsk.tag} {zone}")
- assert "key is not actively signing" in response
+ assert "key is not actively signing" in response.out
def test_rollover_manual_zrrsig_rumoured(ns3):
res = isctest.query.udp(msg, "10.53.0.1")
isctest.check.rcode(res, dns.rcode.NOERROR)
- effectiveconfig = ns1.rndc("showconf -effective", log=False)
- assert 'zone "example.com"' in effectiveconfig
- assert 'view "_bind" chaos {' in effectiveconfig
+ effectiveconfig = ns1.rndc("showconf -effective")
+ assert 'zone "example.com"' in effectiveconfig.out
+ assert 'view "_bind" chaos {' in effectiveconfig.out
# builtin-trust-anchors is non documented and internal clause only, it must
# not be visible.
- assert "builtin-trust-anchors" not in effectiveconfig
+ assert "builtin-trust-anchors" not in effectiveconfig.out
# Dynamically added zones are not visible from the effectiveconfig
zonedata = '"added.example" { type primary; file "example.db"; };'
- ns1.rndc(f"addzone {zonedata}", log=False)
+ ns1.rndc(f"addzone {zonedata}")
msg = isctest.query.create("a.added.example", "A")
res = isctest.query.udp(msg, "10.53.0.1")
isctest.check.rcode(res, dns.rcode.NOERROR)
- effectiveconfig = ns1.rndc("showconf -effective", log=False)
- assert 'zone "added.example"' not in effectiveconfig
+ effectiveconfig = ns1.rndc("showconf -effective")
+ assert 'zone "added.example"' not in effectiveconfig.out
- userconfig = ns1.rndc("showconf -user", log=False)
- assert 'zone "example.com"' in userconfig
- assert 'view "_bind" chaos {' not in userconfig
+ userconfig = ns1.rndc("showconf -user")
+ assert 'zone "example.com"' in userconfig.out
+ assert 'view "_bind" chaos {' not in userconfig.out
- builtinconfig = ns1.rndc("showconf -builtin", log=False)
- assert len(userconfig.split()) < len(builtinconfig.split())
- assert len(builtinconfig.split()) < len(effectiveconfig.split())
+ builtinconfig = ns1.rndc("showconf -builtin")
+ assert len(userconfig.out.split()) < len(builtinconfig.out.split())
+ assert len(builtinconfig.out.split()) < len(effectiveconfig.out.split())
# Errors handling
- error_msg = ""
-
- try:
- ns1.rndc("showconf -idontexist", log=False)
- except isctest.rndc.RNDCException as e:
- error_msg = str(e)
- assert error_msg == "rndc: 'showconf' failed: syntax error\n"
-
- try:
- ns1.rndc("showconf", log=False)
- except isctest.rndc.RNDCException as e:
- error_msg = str(e)
- assert error_msg == "rndc: 'showconf' failed: unexpected end of input\n"
+ response = ns1.rndc("showconf -idontexist", raise_on_exception=False)
+ assert response.rc != 0
+ assert "rndc: 'showconf' failed: syntax error" in response.err
+
+ response = ns1.rndc("showconf", raise_on_exception=False)
+ assert response.rc != 0
+ assert "rndc: 'showconf' failed: unexpected end of input" in response.err
# helper function, 'command' is the rndc command to run
def launch_rndc(command):
- try:
- instance.rndc(command, log=False)
- return 0
- except isctest.rndc.RNDCException:
- return -1
+ ret = instance.rndc(command, raise_on_exception=False)
+ return 0 if ret.rc == 0 else -1
# We're going to execute queries in parallel by means of a thread pool.
# dnspython functions block, so we need to circumvent that.
# information regarding copyright ownership.
import concurrent.futures
-import os
import time
import dns.update
def rndc_loop(test_state, server):
- rndc = os.getenv("RNDC")
- port = os.getenv("CONTROLPORT")
-
- cmdline = [
- rndc,
- "-c",
- "../_common/rndc.conf",
- "-p",
- port,
- "-s",
- server,
- "reload",
- ]
-
while not test_state["finished"]:
- isctest.run.cmd(cmdline, raise_on_exception=False)
+ server.rndc("reload", raise_on_exception=False)
time.sleep(1)
def test_synthrecord_inview(ns1, templates):
templates.render("ns1/named.conf", {"inview": True})
with ns1.watch_log_from_here() as watcher:
- try:
- ns1.rndc("reconfig")
- except isctest.rndc.RNDCException:
- watcher.wait_for_line("'synthrecord' must be configured as a zone plugin")
+ cmd = ns1.rndc("reconfig", raise_on_exception=False)
+ assert cmd.rc != 0
+ watcher.wait_for_line("'synthrecord' must be configured as a zone plugin")
templates.render("ns2/named.conf", {"zone_names": zone_names})
shutil.copyfile("ns2/zone.db.in", f"ns2/{name}.db")
with ns2.watch_log_from_here() as watcher:
- ns2.rndc("reconfig", log=False)
+ ns2.rndc("reconfig")
log_seq = ["any newly configured zones are now loaded", "running"]
watcher.wait_for_sequence(log_seq)