watcher.wait_for_line("trust-anchor-telemetry '_ta-")
# check that _ta-AAAA trust-anchor-telemetry are not sent when disabled
- ns1.log.prohibit("sending trust-anchor-telemetry query '_ta")
+ assert "sending trust-anchor-telemetry query '_ta" not in ns1.log
# check that KEY-TAG (ednsopt 14) trust-anchor-telemetry queries are
# logged. this matches "dig . dnskey +ednsopt=KEY-TAG:ffff":
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
+from re import compile as Re
import os
-import re
import shutil
import time
)
-# helper functions
-def grep_q(regex, filename):
- with open(filename, "r", encoding="utf-8") as f:
- blob = f.read().splitlines()
- results = [x for x in blob if re.search(regex, x)]
- return len(results) != 0
-
-
def getfrom(file):
with open(file, encoding="utf-8") as f:
return f.read().strip()
isctest.check.servfail(res2)
-def test_excessive_nsec3_iterations():
- assert grep_q(
- "zone too-many-iterations/IN: excessive NSEC3PARAM iterations", "ns2/named.run"
- )
- assert grep_q(
- "zone too-many-iterations/IN: excessive NSEC3PARAM iterations", "ns3/named.run"
- )
+def test_excessive_nsec3_iterations(ns2, ns3):
+ msg = "zone too-many-iterations/IN: excessive NSEC3PARAM iterations"
+ assert msg in ns2.log
+ assert msg in ns3.log
# check fallback to insecure with NSEC3 iterations is too high
msg = isctest.query.create("does-not-exist.too-many-iterations", "A")
def test_cache(ns4):
# check that key id's are logged when dumping the cache
ns4.rndc("dumpdb -cache", log=False)
- assert grep_q("; key id = ", "ns4/named_dump.db")
+ dumpdb = isctest.text.TextFile("ns4/named_dump.db")
+ assert "; key id = " in dumpdb
# check for RRSIG covered type in negative cache
- assert grep_q("; example. RRSIG NSEC ", "ns4/named_dump.db")
+ assert "; example. RRSIG NSEC " in dumpdb
# check validated data are not cached longer than originalttl
msg = isctest.query.create("a.ttlpatch.example", "A")
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
ns4.rndc("dumpdb", log=False)
- grep_q("10.53.0.100", "ns4/named_dump.db")
+ dumpdb = isctest.text.TextFile("ns4/named_dump.db")
+ assert "10.53.0.100" in dumpdb
# then reload server with properly signed zone
shutil.copyfile(
isctest.check.servfail(res)
isctest.check.noadflag(res)
isctest.check.ede(res, EDECode.SIGNATURE_EXPIRED)
- assert grep_q("expired.example/.*: RRSIG has expired", "ns4/named.run")
+ assert Re("expired.example/.*: RRSIG has expired") in ns4.log
# check future signatures do not validate
msg = isctest.query.create("future.example", "SOA")
isctest.check.servfail(res)
isctest.check.noadflag(res)
isctest.check.ede(res, EDECode.SIGNATURE_NOT_YET_VALID)
- assert grep_q(
- "future.example/.*: RRSIG validity period has not begun", "ns4/named.run"
- )
+ assert Re("future.example/.*: RRSIG validity period has not begun") in ns4.log
# check that a dynamic zone with future signatures is re-signed on load
msg = isctest.query.create("managed-future.example", "A")
import dns.message
import dns.rcode
-from .log import debug, info, LogFile, WatchLogFromStart, WatchLogFromHere
+from .log import debug, info, WatchLogFromStart, WatchLogFromHere
from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
from .run import perl
from .query import udp
+from .text import TextFile
class NamedPorts(NamedTuple):
if ports is None:
ports = NamedPorts.from_env()
self.ports = ports
- self.log = LogFile(os.path.join(identifier, "named.run"))
+ self.log = TextFile(os.path.join(identifier, "named.run"))
self._rndc_executor = rndc_executor or RNDCBinaryExecutor()
self._rndc_logger = rndc_logger
f"{os.environ['srcdir']}/start.pl",
[self.system_test_name, self.identifier] + args,
)
+
+ def __repr__(self):
+ return self.identifier
def check_cdslog_prohibit(server, zone, key, substr):
- server.log.prohibit(
- f"{substr} for key {zone}/{key.algorithm.name}/{key.tag} is now published"
- )
+ msg = f"{substr} for key {zone}/{key.algorithm.name}/{key.tag} is now published"
+ assert msg not in server.log
def check_cdsdelete(rrset, expected):
critical,
)
-from .watchlog import LogFile, WatchLogFromStart, WatchLogFromHere
+from .watchlog import WatchLogFromStart, WatchLogFromHere
import os
import time
-from isctest.text import compile_pattern, FlexPattern, LineReader, LogFile
+from isctest.text import compile_pattern, FlexPattern, LineReader
T = TypeVar("T")
import abc
import re
from re import compile as Re
-from typing import Iterator, Match, Optional, Pattern, TextIO, Union
+from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union
FlexPattern = Union[str, Pattern]
raise TypeError("only string and re.Pattern allowed")
-class LogFile:
+class Grep(abc.ABC):
"""
- Log file wrapper with a path and means to find a string in its contents.
+ Implement a grep-like interface for pattern matching in texts and files.
"""
- def __init__(self, path: str):
- self.path = path
+ @abc.abstractmethod
+ def readlines(self) -> Iterator[str]:
+ raise NotImplementedError
- @property
- def _lines(self) -> Iterator[str]:
- with open(self.path, encoding="utf-8") as f:
- yield from f
+ def igrep(self, pattern: FlexPattern) -> Iterator[Match]:
+ """
+ Iterate over the lines matching the pattern.
+ """
+ regex = compile_pattern(pattern)
+
+ for line in self.readlines():
+ match = regex.search(line)
+ if match:
+ yield match
+
+ def grep(self, pattern: FlexPattern) -> List[Match]:
+ """
+ Get list of lines matching the pattern.
+ """
+ return list(self.igrep(pattern))
- def __contains__(self, substring: str) -> bool:
+ def __contains__(self, pattern: FlexPattern) -> bool:
"""
- Return whether any of the lines in the log contains a given string.
+ Return whether any of the lines in the log contains matches the pattern.
"""
- for line in self._lines:
- if substring in line:
- return True
- return False
+ try:
+ next(self.igrep(pattern))
+ except StopIteration:
+ return False
+ return True
- def expect(self, msg: str):
- """Check the string is present anywhere in the log file."""
- if msg in self:
- return
- assert False, f"log message not found in log {self.path}: {msg}"
- def prohibit(self, msg: str):
- """Check the string is not present in the entire log file."""
- if msg in self:
- assert False, f"forbidden message appeared in log {self.path}: {msg}"
+class TextFile(Grep):
+ """
+ Text file wrapper with grep support.
+ """
+
+ def __init__(self, path: str):
+ self.path = path
+
+ def readlines(self) -> Iterator[str]:
+ with open(self.path, encoding="utf-8") as f:
+ yield from f
+
+ def __repr__(self):
+ return self.path
-class LineReader:
+class LineReader(Grep):
"""
>>> import io
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
- ns3.log.prohibit(msg)
+ assert msg not in ns3.log
def test_kasp_purge_keys(ns4):
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN/example1 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
- ns4.log.prohibit(msg)
+ assert msg not in ns4.log
msg = f"zone {zone}/IN/example2 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
- ns4.log.prohibit(msg)
+ assert msg not in ns4.log
def test_kasp_reload_restart(ns6):
# Key rollover should have been be blocked.
tag = expected[1].key.tag
blockmsg = f"keymgr-manual-mode: block ZSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
- ns3.log.expect(blockmsg)
+ assert blockmsg in ns3.log
# Remove files.
for key in ksks + zsks:
isctest.log.info(
f"- zone {zone} {server.identifier}: make sure we did not try to sign with the keys added with nsupdate"
)
- server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
+ assert f"dns_zone_findkeys: error reading ./K{zone}" not in server.log
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
# Check again.
isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
check_dnssec(server, zone, keys + extra_keys, expected + extra)
- server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
+ assert f"dns_zone_findkeys: error reading ./K{zone}" not in server.log
def _check_remove_zsk_fail(
tag = keys[0].key.tag
msg1 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{tag} (CSK)"
msg2 = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})"
- ns6.log.expect(msg1)
- ns6.log.expect(msg2)
+ assert msg1 in ns6.log
+ assert msg2 in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
- ns6.log.expect(msg)
+ assert msg in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/RSASHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
- ns6.log.expect(msg)
+ assert msg in ns6.log
# Force step.
tag = keys[1].key.tag
msg1 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{ktag} (KSK)"
msg2 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{ztag} (ZSK)"
msg3 = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})" # twice
- ns6.log.expect(msg1)
- ns6.log.expect(msg2)
- ns6.log.expect(msg3)
+ assert msg1 in ns6.log
+ assert msg2 in ns6.log
+ assert len(ns6.log.grep(msg3)) >= 2
# Force step.
with ns6.watch_log_from_here() as watcher:
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
- ns6.log.expect(msg)
+ assert msg in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
ztag = keys[1].key.tag
msg1 = f"keymgr-manual-mode: block transition KSK {zone}/RSASHA256/{ktag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition ZSK {zone}/RSASHA256/{ztag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
- ns6.log.expect(msg1)
- ns6.log.expect(msg2)
+ assert msg1 in ns6.log
+ assert msg2 in ns6.log
# Force step.
ktag = keys[3].key.tag
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block CSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
-
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
tag = keys[1].key.tag
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block CSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
tag = keys[0].key.tag
msg1 = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
- ns3.log.expect(msg1)
- ns3.log.expect(msg2)
+ assert msg1 in ns3.log
+ assert msg2 in ns3.log
# Force step.
tag = keys[1].key.tag
# Check logs.
msg = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block KSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
tag = keys[1].key.tag
msg1 = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
- ns3.log.expect(msg1)
- ns3.log.expect(msg2)
+ assert msg1 in ns3.log
+ assert msg2 in ns3.log
# Force step.
tag = keys[2].key.tag
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block ZSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition ZSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition ZSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
- ns3.log.expect(msg)
+ assert msg in ns3.log
# Force step.
tag = keys[2].key.tag
"zone warn/IN: loaded serial 0",
"zone nowarn/IN: loaded serial 0",
):
- ns1.log.expect(msg)
+ assert msg in ns1.log
for msg in (
"zone nowarn/IN: 'y.nowarn' found type SPF record but no SPF TXT record found",
"zone warn/IN: 'warn' found type SPF record but no SPF TXT record found",
"zone nowarn/IN: 'nowarn' found type SPF record but no SPF TXT record found",
):
- ns1.log.prohibit(msg)
+ assert msg not in ns1.log